Add repair streaming preview Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13257
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4cfaf855 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4cfaf855 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4cfaf855 Branch: refs/heads/trunk Commit: 4cfaf855c404256a9dd281d5066cc076232d72ff Parents: 1e20d95 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Thu Feb 23 09:35:04 2017 -0800 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Mon Apr 24 09:21:33 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 6 +- doc/gen-nodetool-docs.py | 2 +- doc/source/operating/repair.rst | 87 ++++++- .../db/compaction/CompactionManager.java | 24 +- .../org/apache/cassandra/dht/RangeStreamer.java | 3 +- .../cassandra/exceptions/RepairException.java | 12 +- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../net/IncomingStreamingConnection.java | 2 +- .../apache/cassandra/repair/LocalSyncTask.java | 16 +- .../apache/cassandra/repair/RemoteSyncTask.java | 17 +- .../org/apache/cassandra/repair/RepairJob.java | 39 ++-- .../apache/cassandra/repair/RepairJobDesc.java | 6 + .../repair/RepairMessageVerbHandler.java | 15 +- .../apache/cassandra/repair/RepairRunnable.java | 100 +++++++- .../apache/cassandra/repair/RepairSession.java | 42 ++-- .../cassandra/repair/StreamingRepairTask.java | 15 +- .../org/apache/cassandra/repair/SyncStat.java | 18 +- .../org/apache/cassandra/repair/SyncTask.java | 7 +- .../apache/cassandra/repair/ValidationTask.java | 7 +- .../org/apache/cassandra/repair/Validator.java | 16 +- .../repair/consistent/SyncStatSummary.java | 233 +++++++++++++++++++ .../repair/messages/PrepareMessage.java | 13 +- .../cassandra/repair/messages/RepairOption.java | 41 +++- .../cassandra/repair/messages/SyncComplete.java | 40 +++- .../cassandra/repair/messages/SyncRequest.java | 16 +- .../cassandra/service/ActiveRepairService.java | 38 ++- .../cassandra/streaming/ConnectionHandler.java | 23 +- .../apache/cassandra/streaming/PreviewKind.java | 76 ++++++ .../apache/cassandra/streaming/SessionInfo.java | 7 + .../cassandra/streaming/SessionSummary.java | 141 +++++++++++ .../cassandra/streaming/StreamCoordinator.java | 8 +- .../apache/cassandra/streaming/StreamPlan.java | 11 +- .../cassandra/streaming/StreamReceiveTask.java | 3 + .../cassandra/streaming/StreamResultFuture.java | 9 +- .../cassandra/streaming/StreamSession.java | 57 ++++- .../apache/cassandra/streaming/StreamState.java | 7 + .../streaming/messages/StreamInitMessage.java | 10 +- .../apache/cassandra/tools/nodetool/Repair.java | 29 +++ .../serialization/4.0/gms.EndpointState.bin | Bin 0 -> 73 bytes test/data/serialization/4.0/gms.Gossip.bin | Bin 0 -> 158 bytes .../serialization/4.0/service.SyncComplete.bin | Bin 0 -> 538 bytes .../serialization/4.0/service.SyncRequest.bin | Bin 0 -> 227 bytes .../4.0/service.ValidationComplete.bin | Bin 0 -> 1251 bytes .../4.0/service.ValidationRequest.bin | Bin 0 -> 167 bytes .../4.0/utils.EstimatedHistogram.bin | Bin 0 -> 97500 bytes .../cassandra/AbstractSerializationsTester.java | 3 +- ...tionManagerGetSSTablesForValidationTest.java | 10 +- .../LeveledCompactionStrategyTest.java | 12 +- .../cassandra/dht/StreamStateStoreTest.java | 5 +- .../io/sstable/SSTableRewriterTest.java | 5 +- .../cassandra/repair/AbstractRepairTest.java | 4 +- .../cassandra/repair/LocalSyncTaskTest.java | 14 +- .../cassandra/repair/RepairSessionTest.java | 6 +- .../repair/StreamingRepairTaskTest.java | 9 +- .../apache/cassandra/repair/ValidatorTest.java | 9 +- .../AbstractConsistentSessionTest.java | 4 +- .../consistent/PendingAntiCompactionTest.java | 3 +- .../RepairMessageSerializationsTest.java | 17 +- .../service/ActiveRepairServiceTest.java | 14 +- .../cassandra/service/SerializationsTest.java | 23 +- .../cassandra/streaming/StreamSessionTest.java | 3 +- .../streaming/StreamTransferTaskTest.java | 6 +- 63 files changed, 1156 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c82c8ee..fc06bde 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add repair streaming preview (CASSANDRA-13257) * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430) * Change protocol to allow sending key space independent of query string (CASSANDRA-10145) * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 1ec4637..1c456b5 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -23,6 +23,8 @@ New features - Support for arithmetic operations between `timestamp`/`date` and `duration` has been added. See CASSANDRA-11936 - Support for arithmetic operations on number has been added. See CASSANDRA-11935 + - Preview expected streaming required for a repair (nodetool repair --preview), and validate the + consistency of repaired data between nodes (nodetool repair --validate). See CASSANDRA-13257 Upgrading --------- @@ -46,7 +48,9 @@ Upgrading data to be inconsistent between nodes. The fix changes the behavior of both full and incremental repairs. For full repairs, data is no longer marked repaired. For incremental repairs, anticompaction is run at the beginning - of the repair, instead of at the end. + of the repair, instead of at the end. If incremental repair was being used + prior to upgrading, a full repair should be run after upgrading to resolve + any inconsistencies. - Config option index_interval has been removed (it was deprecated since 2.0) - Deprecated repair JMX APIs are removed. http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/doc/gen-nodetool-docs.py ---------------------------------------------------------------------- diff --git a/doc/gen-nodetool-docs.py b/doc/gen-nodetool-docs.py index ae01534..e3862f7 100644 --- a/doc/gen-nodetool-docs.py +++ b/doc/gen-nodetool-docs.py @@ -27,7 +27,7 @@ nodetool = "../bin/nodetool" outdir = "source/tools/nodetool" helpfilename = outdir + "/nodetool.txt" command_re = re.compile("( )([_a-z]+)") -commandRSTContent = ".. _{0}\n\n{0}\n-------\n\nUsage\n---------\n\n.. include:: {0}.txt\n :literal:\n\n" +commandRSTContent = ".. _nodetool_{0}:\n\n{0}\n-------\n\nUsage\n---------\n\n.. include:: {0}.txt\n :literal:\n\n" # create the documentation directory if not os.path.exists(outdir): http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/doc/source/operating/repair.rst ---------------------------------------------------------------------- diff --git a/doc/source/operating/repair.rst b/doc/source/operating/repair.rst index 97d8ce8..97115dc 100644 --- a/doc/source/operating/repair.rst +++ b/doc/source/operating/repair.rst @@ -16,7 +16,92 @@ .. highlight:: none +.. _repair: + Repair ------ -.. todo:: todo +Cassandra is designed to remain available if one of it's nodes is down or unreachable. However, when a node is down or +unreachable, it needs to eventually discover the writes it missed. Hints attempt to inform a node of missed writes, but +are a best effort, and aren't guaranteed to inform a node of 100% of the writes it missed. These inconsistencies can +eventually result in data loss as nodes are replaced or tombstones expire. + +These inconsistencies are fixed with the repair process. Repair synchronizes the data between nodes by comparing their +respective datasets for their common token ranges, and streaming the differences for any out of sync sections between +the nodes. It compares the data with merkle trees, which are a hierarchy of hashes. + +Incremental and Full Repairs +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +There are 2 types of repairs: full repairs, and incremental repairs. Full repairs operate over all of the data in the +token range being repaired. Incremental repairs only repair data that's been written since the previous incremental repair. + +Incremental repairs are the default repair type, and if run regularly, can significantly reduce the time and io cost of +performing a repair. However, it's important to understand that once an incremental repair marks data as repaired, it won't +try to repair it again. This is fine for syncing up missed writes, but it doesn't protect against things like disk corruption, +data loss by operator error, or bugs in Cassandra. For this reason, full repairs should still be run occasionally. + +Usage and Best Practices +^^^^^^^^^^^^^^^^^^^^^^^^ + +Since repair can result in a lot of disk and network io, it's not run automatically by Cassandra. It is run by the operator +via nodetool. + +Incremental repair is the default and is run with the following command: + +:: + + nodetool repair + +A full repair can be run with the following command: + +:: + + nodetool repair --full + +Additionally, repair can be run on a single keyspace: + +:: + + nodetool repair [options] <keyspace_name> + +Or even on specific tables: + +:: + + nodetool repair [options] <keyspace_name> <table1> <table2> + + +The repair command only repairs token ranges on the node being repaired, it doesn't repair the whole cluster. By default, repair +will operate on all token ranges replicated by the node you're running repair on, which will cause duplicate work if you run it +on every node. The ``-pr`` flag will only repair the "primary" ranges on a node, so you can repair your entire cluster by running +``nodetool repair -pr`` on each node in a single datacenter. + +The specific frequency of repair that's right for your cluster, of course, depends on several factors. However, if you're +just starting out and looking for somewhere to start, running an incremental repair every 1-3 days, and a full repair every +1-3 weeks is probably reasonable. If you don't want to run incremental repairs, a full repair every 5 days is a good place +to start. + +At a minimum, repair should be run often enough that the gc grace period never expires on unrepaired data. Otherwise, deleted +data could reappear. With a default gc grace period of 10 days, repairing every node in your cluster at least once every 7 days +will prevent this, while providing enough slack to allow for delays. + +Other Options +^^^^^^^^^^^^^ + +``-pr, --partitioner-range`` + Restricts repair to the 'primary' token ranges of the node being repaired. A primary range is just a token range for + which a node is the first replica in the ring. + +``-prv, --preview`` + Estimates the amount of streaming that would occur for the given repair command. This builds the merkle trees, and prints + the expected streaming activity, but does not actually do any streaming. By default, incremental repairs are estimated, + add the ``--full`` flag to estimate a full repair. + +``-vd, --validate`` + Verifies that the repaired data is the same across all nodes. Similiar to ``--preview``, this builds and compares merkle + trees of repaired data, but doesn't do any streaming. This is useful for troubleshooting. If this shows that the repaired + data is out of sync, a full repair should be run. + +.. seealso:: + :ref:`nodetool repair docs <nodetool_repair>` http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index a3ca86a..6f311a2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -29,6 +29,7 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.*; import com.google.common.util.concurrent.*; import org.slf4j.Logger; @@ -69,6 +70,7 @@ import org.apache.cassandra.repair.Validator; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Refs; @@ -611,8 +613,11 @@ public class CompactionManager implements CompactionManagerMBean UUID pendingRepair, UUID parentRepairSession) throws InterruptedException, IOException { - logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables()); - logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession, ranges); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentRepairSession); + Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews"); + + logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables()); + logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession), ranges); Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // we should only notify that repair status changed if it actually did: @@ -640,7 +645,7 @@ public class CompactionManager implements CompactionManagerMBean { if (r.contains(sstableRange)) { - logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r); + logger.info("{} SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r); sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair); sstable.reloadSSTableMetadata(); mutatedRepairStatuses.add(sstable); @@ -652,14 +657,14 @@ public class CompactionManager implements CompactionManagerMBean } else if (sstableRange.intersects(r)) { - logger.info("[repair #{}] SSTable {} ({}) will be anticompacted on range {}", parentRepairSession, sstable, sstableRange, r); + logger.info("{} SSTable {} ({}) will be anticompacted on range {}", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableRange, r); shouldAnticompact = true; } } if (!shouldAnticompact) { - logger.info("[repair #{}] SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableRange, normalizedRanges); + logger.info("{} SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableRange, normalizedRanges); nonAnticompacting.add(sstable); sstableIterator.remove(); } @@ -678,7 +683,7 @@ public class CompactionManager implements CompactionManagerMBean txn.close(); } - logger.info("[repair #{}] Completed anticompaction successfully", parentRepairSession); + logger.info("{} Completed anticompaction successfully", PreviewKind.NONE.logPrefix(parentRepairSession)); } public void performMaximal(final ColumnFamilyStore cfStore, boolean splitOutput) @@ -1427,7 +1432,12 @@ public class CompactionManager implements CompactionManagerMBean Set<SSTableReader> sstablesToValidate = new HashSet<>(); com.google.common.base.Predicate<SSTableReader> predicate; - if (validator.isConsistent) + if (prs.isPreview()) + { + predicate = prs.getPreviewPredicate(); + + } + else if (validator.isConsistent) { predicate = s -> validator.desc.parentSessionId.equals(s.getSSTableMetadata().pendingRepair); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index fd976c9..85c2307 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -39,6 +39,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamOperation; @@ -156,7 +157,7 @@ public class RangeStreamer this.tokens = tokens; this.address = address; this.description = streamOperation.getDescription(); - this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, true, connectSequentially, null); + this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, true, connectSequentially, null, PreviewKind.NONE); this.useStrictConsistency = useStrictConsistency; this.snitch = snitch; this.stateStore = stateStore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/exceptions/RepairException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/RepairException.java b/src/java/org/apache/cassandra/exceptions/RepairException.java index 2f5f2c1..db219a2 100644 --- a/src/java/org/apache/cassandra/exceptions/RepairException.java +++ b/src/java/org/apache/cassandra/exceptions/RepairException.java @@ -18,6 +18,7 @@ package org.apache.cassandra.exceptions; import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.streaming.PreviewKind; /** * Exception thrown during repair @@ -25,22 +26,23 @@ import org.apache.cassandra.repair.RepairJobDesc; public class RepairException extends Exception { public final RepairJobDesc desc; + public final PreviewKind previewKind; public RepairException(RepairJobDesc desc, String message) { - super(message); - this.desc = desc; + this(desc, null, message); } - public RepairException(RepairJobDesc desc, String message, Throwable cause) + public RepairException(RepairJobDesc desc, PreviewKind previewKind, String message) { - super(message, cause); + super(message); this.desc = desc; + this.previewKind = previewKind != null ? previewKind : PreviewKind.NONE; } @Override public String getMessage() { - return desc + " " + super.getMessage(); + return desc.toString(previewKind) + ' ' + super.getMessage(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index e9ea35a..ff47bec 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler client.init(keyspace); outputHandler.output("Established connection to initial hosts"); - StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, false, null).connectionFactory(client.getConnectionFactory()); + StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, false, null, PreviewKind.NONE).connectionFactory(client.getConnectionFactory()); Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index e5fdc99..8db5fcb 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -73,7 +73,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. // Note: we cannot use the same socket for incoming and outgoing streams because we want to // parallelize said streams and the socket is blocking, so we might deadlock. - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.pendingRepair); + StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.pendingRepair, init.previewKind); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 06b1661..5bb66dc 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -29,6 +29,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamEventHandler; @@ -38,6 +39,8 @@ import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; /** * LocalSyncTask performs streaming between local(coordinator) node and remote replica. @@ -51,9 +54,9 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler private final UUID pendingRepair; private final boolean pullRepair; - public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, UUID pendingRepair, boolean pullRepair) + public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind) { - super(desc, r1, r2); + super(desc, r1, r2, previewKind); this.pendingRepair = pendingRepair; this.pullRepair = pullRepair; } @@ -62,7 +65,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler @VisibleForTesting StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, List<Range<Token>> differences) { - StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair) + StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null) .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); // request ranges from the remote node @@ -79,6 +82,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback * that will be called out of band once the streams complete. */ + @Override protected void startSync(List<Range<Token>> differences) { InetAddress local = FBUtilities.getBroadcastAddress(); @@ -87,7 +91,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler InetAddress preferred = SystemKeyspace.getPreferredIP(dst); String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); - logger.info("[repair #{}] {}", desc.sessionId, message); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); createStreamPlan(dst, preferred, differences).execute(); @@ -122,9 +126,9 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler public void onSuccess(StreamState result) { String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); - logger.info("[repair #{}] {}", desc.sessionId, message); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); - set(stat); + set(stat.withSummaries(result.createSummaries())); } public void onFailure(Throwable t) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java index ededc40..a26a392 100644 --- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java @@ -28,6 +28,8 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncRequest; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; @@ -41,30 +43,31 @@ public class RemoteSyncTask extends SyncTask { private static final Logger logger = LoggerFactory.getLogger(RemoteSyncTask.class); - public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2) + public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) { - super(desc, r1, r2); + super(desc, r1, r2, previewKind); } + @Override protected void startSync(List<Range<Token>> differences) { InetAddress local = FBUtilities.getBroadcastAddress(); - SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences); + SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind); String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); - logger.info("[repair #{}] {}", desc.sessionId, message); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); MessagingService.instance().sendOneWay(request.createMessage(), request.src); } - public void syncComplete(boolean success) + public void syncComplete(boolean success, List<SessionSummary> summaries) { if (success) { - set(stat); + set(stat.withSummaries(summaries)); } else { - setException(new RepairException(desc, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint))); + setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint))); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index d1f6a94..d6c1176 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -42,6 +43,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private final RepairParallelism parallelismDegree; private final ListeningExecutorService taskExecutor; private final boolean isConsistent; + private final PreviewKind previewKind; /** * Create repair job to run on specific columnfamily @@ -49,13 +51,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable * @param session RepairSession that this RepairJob belongs * @param columnFamily name of the ColumnFamily to repair */ - public RepairJob(RepairSession session, String columnFamily, boolean isConsistent) + public RepairJob(RepairSession session, String columnFamily, boolean isConsistent, PreviewKind previewKind) { this.session = session; this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges()); this.taskExecutor = session.taskExecutor; this.parallelismDegree = session.parallelismDegree; this.isConsistent = isConsistent; + this.previewKind = previewKind; } /** @@ -128,11 +131,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable SyncTask task; if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) { - task = new LocalSyncTask(desc, r1, r2, isConsistent ? desc.parentSessionId : null, session.pullRepair); + task = new LocalSyncTask(desc, r1, r2, isConsistent ? desc.parentSessionId : null, session.pullRepair, session.previewKind); } else { - task = new RemoteSyncTask(desc, r1, r2); + task = new RemoteSyncTask(desc, r1, r2, session.previewKind); // RemoteSyncTask expects SyncComplete message sent back. // Register task to RepairSession to receive response. session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task); @@ -150,8 +153,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { public void onSuccess(List<SyncStat> stats) { - logger.info("[repair #{}] {} is fully synced", session.getId(), desc.columnFamily); - SystemDistributedKeyspace.successfulRepairJob(session.getId(), desc.keyspace, desc.columnFamily); + if (!previewKind.isPreview()) + { + logger.info("{} {} is fully synced", previewKind.logPrefix(session.getId()), desc.columnFamily); + SystemDistributedKeyspace.successfulRepairJob(session.getId(), desc.keyspace, desc.columnFamily); + } set(new RepairResult(desc, stats)); } @@ -160,8 +166,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable */ public void onFailure(Throwable t) { - logger.warn("[repair #{}] {} sync failed", session.getId(), desc.columnFamily); - SystemDistributedKeyspace.failedRepairJob(session.getId(), desc.keyspace, desc.columnFamily, t); + if (!previewKind.isPreview()) + { + logger.warn("{} {} sync failed", previewKind.logPrefix(session.getId()), desc.columnFamily); + SystemDistributedKeyspace.failedRepairJob(session.getId(), desc.keyspace, desc.columnFamily, t); + } setException(t); } }, taskExecutor); @@ -179,13 +188,13 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddress> endpoints) { String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); - logger.info("[repair #{}] {}", desc.sessionId, message); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds()); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); for (InetAddress endpoint : endpoints) { - ValidationTask task = new ValidationTask(desc, endpoint, gcBefore); + ValidationTask task = new ValidationTask(desc, endpoint, gcBefore, previewKind); tasks.add(task); session.waitForValidation(Pair.create(desc, endpoint), task); taskExecutor.execute(task); @@ -199,14 +208,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddress> endpoints) { String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); - logger.info("[repair #{}] {}", desc.sessionId, message); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds()); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); Queue<InetAddress> requests = new LinkedList<>(endpoints); InetAddress address = requests.poll(); - ValidationTask firstTask = new ValidationTask(desc, address, gcBefore); + ValidationTask firstTask = new ValidationTask(desc, address, gcBefore, previewKind); logger.info("Validating {}", address); session.waitForValidation(Pair.create(desc, address), firstTask); tasks.add(firstTask); @@ -214,7 +223,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable while (requests.size() > 0) { final InetAddress nextAddress = requests.poll(); - final ValidationTask nextTask = new ValidationTask(desc, nextAddress, gcBefore); + final ValidationTask nextTask = new ValidationTask(desc, nextAddress, gcBefore, previewKind); tasks.add(nextTask); Futures.addCallback(currentTask, new FutureCallback<TreeResponse>() { @@ -241,7 +250,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddress> endpoints) { String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); - logger.info("[repair #{}] {}", desc.sessionId, message); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds()); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); @@ -263,7 +272,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { Queue<InetAddress> requests = entry.getValue(); InetAddress address = requests.poll(); - ValidationTask firstTask = new ValidationTask(desc, address, gcBefore); + ValidationTask firstTask = new ValidationTask(desc, address, gcBefore, previewKind); logger.info("Validating {}", address); session.waitForValidation(Pair.create(desc, address), firstTask); tasks.add(firstTask); @@ -271,7 +280,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable while (requests.size() > 0) { final InetAddress nextAddress = requests.poll(); - final ValidationTask nextTask = new ValidationTask(desc, nextAddress, gcBefore); + final ValidationTask nextTask = new ValidationTask(desc, nextAddress, gcBefore, previewKind); tasks.add(nextTask); Futures.addCallback(currentTask, new FutureCallback<TreeResponse>() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairJobDesc.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java index be3daef..8f70e00 100644 --- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java +++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java @@ -32,6 +32,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.UUIDSerializer; /** @@ -66,6 +67,11 @@ public class RepairJobDesc return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; } + public String toString(PreviewKind previewKind) + { + return '[' + previewKind.logPrefix() + " #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; + } + @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index b6b9f87..ed62229 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -35,6 +35,7 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; /** * Handles all repair related message. @@ -50,6 +51,12 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> return ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID); } + private PreviewKind previewKind(UUID sessionID) + { + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); + return prs != null ? prs.previewKind : PreviewKind.NONE; + } + public void doVerb(final MessageIn<RepairMessage> message, final int id) { // TODO add cancel/interrupt message @@ -79,7 +86,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> prepareMessage.ranges, prepareMessage.isIncremental, prepareMessage.timestamp, - prepareMessage.isGlobal); + prepareMessage.isGlobal, + prepareMessage.previewKind); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; @@ -127,7 +135,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> } ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId); - Validator validator = new Validator(desc, message.from, validationRequest.gcBefore, isConsistent(desc.parentSessionId)); + Validator validator = new Validator(desc, message.from, validationRequest.gcBefore, + isConsistent(desc.parentSessionId), previewKind(desc.parentSessionId)); CompactionManager.instance.submitValidation(store, validator); break; @@ -135,7 +144,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> // forwarded sync request SyncRequest request = (SyncRequest) message.payload; logger.debug("Syncing {}", request); - StreamingRepairTask task = new StreamingRepairTask(desc, request, isConsistent(desc.parentSessionId) ? desc.parentSessionId : null); + StreamingRepairTask task = new StreamingRepairTask(desc, request, isConsistent(desc.parentSessionId) ? desc.parentSessionId : null, request.previewKind); task.run(); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index b8bef95..354d8b5 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.repair.consistent.SyncStatSummary; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; @@ -50,6 +51,7 @@ import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; @@ -218,7 +220,11 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti cfnames[i] = columnFamilyStores.get(i).name; } - SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options); + if (!options.isPreview()) + { + SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options); + } + long repairedAt; try { @@ -228,12 +234,19 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } catch (Throwable t) { - SystemDistributedKeyspace.failParentRepair(parentSession, t); + if (!options.isPreview()) + { + SystemDistributedKeyspace.failParentRepair(parentSession, t); + } fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage()); return; } - if (options.isIncremental()) + if (options.isPreview()) + { + previewRepair(parentSession, repairedAt, startTime, traceState, allNeighbors, commonRanges, cfnames); + } + else if (options.isIncremental()) { consistentRepair(parentSession, repairedAt, startTime, traceState, allNeighbors, commonRanges, cfnames); } @@ -311,6 +324,76 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, ranges, startTime, traceState, hasFailure, executor)); } + private void previewRepair(UUID parentSession, + long repairedAt, + long startTime, + TraceState traceState, + Set<InetAddress> allNeighbors, + List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges, + String... cfnames) + { + + logger.debug("Starting preview repair for {}", parentSession); + // Set up RepairJob executor for this repair command. + ListeningExecutorService executor = createExecutor(); + + final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames); + + Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>() + { + public void onSuccess(List<RepairSessionResult> results) + { + try + { + PreviewKind previewKind = options.getPreviewKind(); + assert previewKind != PreviewKind.NONE; + SyncStatSummary summary = new SyncStatSummary(true); + summary.consumeSessionResults(results); + + if (summary.isEmpty()) + { + String message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync"; + logger.info(message); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message)); + } + else + { + String message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString(); + logger.info(message); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message)); + } + + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, + "Repair preview completed successfully")); + complete(); + } + catch (Throwable t) + { + logger.error("Error completing preview repair", t); + onFailure(t); + } + } + + public void onFailure(Throwable t) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); + logger.error("Error completing preview repair", t); + complete(); + } + + private void complete() + { + logger.debug("Preview repair {} completed", parentSession); + + String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, + true, true); + String message = String.format("Repair preview #%d finished in %s", cmd, duration); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); + executor.shutdownNow(); + } + }); + } + private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession, boolean isConsistent, ListeningExecutorService executor, @@ -327,6 +410,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti p.left, isConsistent, options.isPullRepair(), + options.getPreviewKind(), executor, cfnames); if (session == null) @@ -405,7 +489,10 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti public void onSuccess(Object result) { - SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); + if (!options.isPreview()) + { + SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); + } if (hasFailure.get()) { fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, @@ -422,7 +509,10 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti public void onFailure(Throwable t) { fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); - SystemDistributedKeyspace.failParentRepair(parentSession, t); + if (!options.isPreview()) + { + SystemDistributedKeyspace.failParentRepair(parentSession, t); + } repairComplete(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 2aa068c..8aa9590 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -34,6 +34,8 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTrees; @@ -91,6 +93,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public final Collection<Range<Token>> ranges; public final Set<InetAddress> endpoints; public final boolean isConsistent; + public final PreviewKind previewKind; private final AtomicBoolean isFailed = new AtomicBoolean(false); @@ -125,6 +128,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement Set<InetAddress> endpoints, boolean isConsistent, boolean pullRepair, + PreviewKind previewKind, String... cfnames) { assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; @@ -137,6 +141,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.ranges = ranges; this.endpoints = endpoints; this.isConsistent = isConsistent; + this.previewKind = previewKind; this.pullRepair = pullRepair; } @@ -177,7 +182,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement } String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint); - logger.info("[repair #{}] {}", getId(), message); + logger.info("{} {}", previewKind.logPrefix(getId()), message); Tracing.traceRepair(message); task.treesReceived(trees); } @@ -189,7 +194,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement * @param nodes nodes that completed sync * @param success true if sync succeeded */ - public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success) + public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries) { RemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes)); if (task == null) @@ -198,8 +203,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement return; } - logger.debug("[repair #{}] Repair completed between {} and {} on {}", getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily); - task.syncComplete(success); + logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.endpoint1, nodes.endpoint2, desc.columnFamily); + task.syncComplete(success, summaries); } private String repairedNodes() @@ -225,16 +230,22 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement if (terminated) return; - logger.info("[repair #{}] new session: will sync {} on range {} for {}.{}", getId(), repairedNodes(), ranges, keyspace, Arrays.toString(cfnames)); + logger.info("{} new session: will sync {} on range {} for {}.{}", previewKind.logPrefix(getId()), repairedNodes(), ranges, keyspace, Arrays.toString(cfnames)); Tracing.traceRepair("Syncing range {}", ranges); - SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, ranges, endpoints); + if (!previewKind.isPreview()) + { + SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, ranges, endpoints); + } if (endpoints.isEmpty()) { - logger.info("[repair #{}] {}", getId(), message = String.format("No neighbors to repair with on range %s: session completed", ranges)); + logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", ranges)); Tracing.traceRepair(message); set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList())); - SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message)); + if (!previewKind.isPreview()) + { + SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message)); + } return; } @@ -244,10 +255,13 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement if (!FailureDetector.instance.isAlive(endpoint)) { message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint); - logger.error("[repair #{}] {}", getId(), message); + logger.error("{} {}", previewKind.logPrefix(getId()), message); Exception e = new IOException(message); setException(e); - SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, e); + if (!previewKind.isPreview()) + { + SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, e); + } return; } } @@ -256,7 +270,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length); for (String cfname : cfnames) { - RepairJob job = new RepairJob(this, cfname, isConsistent); + RepairJob job = new RepairJob(this, cfname, isConsistent, previewKind); executor.execute(job); jobs.add(job); } @@ -267,7 +281,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public void onSuccess(List<RepairResult> results) { // this repair session is completed - logger.info("[repair #{}] {}", getId(), "Session completed successfully"); + logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully"); Tracing.traceRepair("Completed sync of range {}", ranges); set(new RepairSessionResult(id, keyspace, ranges, results)); @@ -278,7 +292,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public void onFailure(Throwable t) { - logger.error(String.format("[repair #%s] Session completed with the following error", getId()), t); + logger.error("{} Session completed with the following error", previewKind.logPrefix(getId()), t); Tracing.traceRepair("Session completed with the following error: {}", t); forceShutdown(t); } @@ -335,7 +349,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement return; Exception exception = new IOException(String.format("Endpoint %s died", endpoint)); - logger.error(String.format("[repair #%s] session completed with the following error", getId()), exception); + logger.error(String.format("{} session completed with the following error", previewKind.logPrefix(getId())), exception); // If a node failed, we stop everything (though there could still be some activity in the background) forceShutdown(exception); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index 7042de1..f43010b 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -19,6 +19,7 @@ package org.apache.cassandra.repair; import java.net.InetAddress; import java.util.UUID; +import java.util.Collections; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -28,7 +29,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncComplete; import org.apache.cassandra.repair.messages.SyncRequest; -import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamEventHandler; import org.apache.cassandra.streaming.StreamPlan; @@ -46,12 +47,14 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler private final RepairJobDesc desc; private final SyncRequest request; private final UUID pendingRepair; + private final PreviewKind previewKind; - public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, UUID pendingRepair) + public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, UUID pendingRepair, PreviewKind previewKind) { this.desc = desc; this.request = request; this.pendingRepair = pendingRepair; + this.previewKind = previewKind; } public void run() @@ -65,7 +68,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler @VisibleForTesting StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred) { - return new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair) + return new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // request ranges from the remote node @@ -83,8 +86,8 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler */ public void onSuccess(StreamState state) { - logger.info("[repair #{}] streaming task succeed, returning response to {}", desc.sessionId, request.initiator); - MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, true).createMessage(), request.initiator); + logger.info("{} streaming task succeed, returning response to {}", previewKind.logPrefix(desc.sessionId), request.initiator); + MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, true, state.createSummaries()).createMessage(), request.initiator); } /** @@ -92,6 +95,6 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler */ public void onFailure(Throwable t) { - MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, false).createMessage(), request.initiator); + MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, false, Collections.emptyList()).createMessage(), request.initiator); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/SyncStat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncStat.java b/src/java/org/apache/cassandra/repair/SyncStat.java index 5721a20..dab5659 100644 --- a/src/java/org/apache/cassandra/repair/SyncStat.java +++ b/src/java/org/apache/cassandra/repair/SyncStat.java @@ -17,17 +17,33 @@ */ package org.apache.cassandra.repair; +import java.util.List; + +import org.apache.cassandra.streaming.SessionSummary; + /** * Statistics about synchronizing two replica */ public class SyncStat { public final NodePair nodes; - public final long numberOfDifferences; + public final long numberOfDifferences; // TODO: revert to Range<Token> + public final List<SessionSummary> summaries; public SyncStat(NodePair nodes, long numberOfDifferences) { + this(nodes, numberOfDifferences, null); + } + + public SyncStat(NodePair nodes, long numberOfDifferences, List<SessionSummary> summaries) + { this.nodes = nodes; this.numberOfDifferences = numberOfDifferences; + this.summaries = summaries; + } + + public SyncStat withSummaries(List<SessionSummary> summaries) + { + return new SyncStat(nodes, numberOfDifferences, summaries); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/SyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java index 8adec6f..6f3157a 100644 --- a/src/java/org/apache/cassandra/repair/SyncTask.java +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.MerkleTrees; @@ -39,14 +40,16 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna protected final RepairJobDesc desc; protected final TreeResponse r1; protected final TreeResponse r2; + protected final PreviewKind previewKind; protected volatile SyncStat stat; - public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2) + public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) { this.desc = desc; this.r1 = r1; this.r2 = r2; + this.previewKind = previewKind; } /** @@ -60,7 +63,7 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size()); // choose a repair method based on the significance of the difference - String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); + String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, desc.columnFamily); if (differences.isEmpty()) { logger.info(String.format(format, "are consistent")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/ValidationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java index 177ad3e..f68d3c5 100644 --- a/src/java/org/apache/cassandra/repair/ValidationTask.java +++ b/src/java/org/apache/cassandra/repair/ValidationTask.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.AbstractFuture; import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.ValidationRequest; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.MerkleTrees; /** @@ -35,12 +36,14 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn private final RepairJobDesc desc; private final InetAddress endpoint; private final int gcBefore; + private final PreviewKind previewKind; - public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int gcBefore) + public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int gcBefore, PreviewKind previewKind) { this.desc = desc; this.endpoint = endpoint; this.gcBefore = gcBefore; + this.previewKind = previewKind; } /** @@ -61,7 +64,7 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn { if (trees == null) { - setException(new RepairException(desc, "Validation failed in " + endpoint)); + setException(new RepairException(desc, previewKind, "Validation failed in " + endpoint)); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index e8e3621..ba1fa9d 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -37,6 +37,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.ValidationComplete; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; @@ -71,22 +72,25 @@ public class Validator implements Runnable // last key seen private DecoratedKey lastKey; - public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore) + private final PreviewKind previewKind; + + public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, PreviewKind previewKind) { - this(desc, initiator, gcBefore, false, false); + this(desc, initiator, gcBefore, false, false, previewKind); } - public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean isConsistent) + public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean isConsistent, PreviewKind previewKind) { - this(desc, initiator, gcBefore, false, isConsistent); + this(desc, initiator, gcBefore, false, isConsistent, previewKind); } - public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution, boolean isConsistent) + public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution, boolean isConsistent, PreviewKind previewKind) { this.desc = desc; this.initiator = initiator; this.gcBefore = gcBefore; this.isConsistent = isConsistent; + this.previewKind = previewKind; validated = 0; range = null; ranges = null; @@ -285,7 +289,7 @@ public class Validator implements Runnable // respond to the request that triggered this validation if (!initiator.equals(FBUtilities.getBroadcastAddress())) { - logger.info("[repair #{}] Sending completed merkle tree to {} for {}.{}", desc.sessionId, initiator, desc.keyspace, desc.columnFamily); + logger.info("{} Sending completed merkle tree to {} for {}.{}", previewKind.logPrefix(desc.sessionId), initiator, desc.keyspace, desc.columnFamily); Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily); } MessagingService.instance().sendOneWay(new ValidationComplete(desc, trees).createMessage(), initiator); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java new file mode 100644 index 0000000..015b558 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import com.google.common.collect.Lists; + +import org.apache.cassandra.repair.RepairResult; +import org.apache.cassandra.repair.RepairSessionResult; +import org.apache.cassandra.repair.SyncStat; +import org.apache.cassandra.streaming.SessionSummary; +import org.apache.cassandra.streaming.StreamSummary; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +import static com.google.common.collect.Iterables.filter; + +public class SyncStatSummary +{ + + private static class Session + { + final InetAddress src; + final InetAddress dst; + + int files = 0; + long bytes = 0; + long ranges = 0; + + Session(InetAddress src, InetAddress dst) + { + this.src = src; + this.dst = dst; + } + + void consumeSummary(StreamSummary summary) + { + files += summary.files; + bytes += summary.totalSize; + } + + void consumeSummaries(Collection<StreamSummary> summaries, long numRanges) + { + summaries.forEach(this::consumeSummary); + ranges += numRanges; + } + + public String toString() + { + return String.format("%s -> %s: %s ranges, %s sstables, %s bytes", src, dst, ranges, files, FBUtilities.prettyPrintMemory(bytes)); + } + } + + private static class Table + { + final String keyspace; + + final String table; + + int files = -1; + long bytes = -1; + int ranges = -1; + boolean totalsCalculated = false; + + final Map<Pair<InetAddress, InetAddress>, Session> sessions = new HashMap<>(); + + Table(String keyspace, String table) + { + this.keyspace = keyspace; + this.table = table; + } + + Session getOrCreate(InetAddress from, InetAddress to) + { + Pair<InetAddress, InetAddress> k = Pair.create(from, to); + if (!sessions.containsKey(k)) + { + sessions.put(k, new Session(from, to)); + } + return sessions.get(k); + } + + void consumeStat(SyncStat stat) + { + for (SessionSummary summary: stat.summaries) + { + getOrCreate(summary.coordinator, summary.peer).consumeSummaries(summary.sendingSummaries, stat.numberOfDifferences); + getOrCreate(summary.peer, summary.coordinator).consumeSummaries(summary.receivingSummaries, stat.numberOfDifferences); + } + } + + void consumeStats(List<SyncStat> stats) + { + filter(stats, s -> s.summaries != null).forEach(this::consumeStat); + } + + void calculateTotals() + { + files = 0; + bytes = 0; + ranges = 0; + for (Session session: sessions.values()) + { + files += session.files; + bytes += session.bytes; + ranges += session.ranges; + } + totalsCalculated = true; + } + + public String toString() + { + if (!totalsCalculated) + { + calculateTotals(); + } + StringBuilder output = new StringBuilder(); + + output.append(String.format("%s.%s - %s ranges, %s sstables, %s bytes\n", keyspace, table, ranges, files, FBUtilities.prettyPrintMemory(bytes))); + for (Session session: sessions.values()) + { + output.append(" ").append(session.toString()).append('\n'); + } + return output.toString(); + } + } + + private Map<Pair<String, String>, Table> summaries = new HashMap<>(); + private final boolean isEstimate; + + private int files = -1; + private long bytes = -1; + private int ranges = -1; + private boolean totalsCalculated = false; + + public SyncStatSummary(boolean isEstimate) + { + this.isEstimate = isEstimate; + } + + public void consumeRepairResult(RepairResult result) + { + Pair<String, String> cf = Pair.create(result.desc.keyspace, result.desc.columnFamily); + if (!summaries.containsKey(cf)) + { + summaries.put(cf, new Table(cf.left, cf.right)); + } + summaries.get(cf).consumeStats(result.stats); + } + + public void consumeSessionResults(List<RepairSessionResult> results) + { + if (results != null) + { + filter(results, Objects::nonNull).forEach(r -> filter(r.repairJobResults, Objects::nonNull).forEach(this::consumeRepairResult)); + } + } + + public boolean isEmpty() + { + calculateTotals(); + return files == 0 && bytes == 0 && ranges == 0; + } + + private void calculateTotals() + { + files = 0; + bytes = 0; + ranges = 0; + summaries.values().forEach(Table::calculateTotals); + for (Table table: summaries.values()) + { + table.calculateTotals(); + files += table.files; + bytes += table.bytes; + ranges += table.ranges; + } + totalsCalculated = true; + } + + public String toString() + { + List<Pair<String, String>> tables = Lists.newArrayList(summaries.keySet()); + tables.sort((o1, o2) -> + { + int ks = o1.left.compareTo(o2.left); + return ks != 0 ? ks : o1.right.compareTo(o2.right); + }); + + calculateTotals(); + + StringBuilder output = new StringBuilder(); + + if (isEstimate) + { + output.append(String.format("Total estimated streaming: %s ranges, %s sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes))); + } + else + { + output.append(String.format("Total streaming: %s ranges, %s sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes))); + } + + for (Pair<String, String> tableName: tables) + { + Table table = summaries.get(tableName); + output.append(table.toString()).append('\n'); + } + + return output.toString(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java index 9903114..4d59942 100644 --- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java @@ -31,6 +31,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.UUIDSerializer; @@ -44,8 +45,9 @@ public class PrepareMessage extends RepairMessage public final boolean isIncremental; public final long timestamp; public final boolean isGlobal; + public final PreviewKind previewKind; - public PrepareMessage(UUID parentRepairSession, List<TableId> tableIds, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal) + public PrepareMessage(UUID parentRepairSession, List<TableId> tableIds, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal, PreviewKind previewKind) { super(Type.PREPARE_MESSAGE, null); this.parentRepairSession = parentRepairSession; @@ -54,6 +56,7 @@ public class PrepareMessage extends RepairMessage this.isIncremental = isIncremental; this.timestamp = timestamp; this.isGlobal = isGlobal; + this.previewKind = previewKind; } @Override @@ -66,6 +69,7 @@ public class PrepareMessage extends RepairMessage parentRepairSession.equals(other.parentRepairSession) && isIncremental == other.isIncremental && isGlobal == other.isGlobal && + previewKind == other.previewKind && timestamp == other.timestamp && tableIds.equals(other.tableIds) && ranges.equals(other.ranges); @@ -74,7 +78,7 @@ public class PrepareMessage extends RepairMessage @Override public int hashCode() { - return Objects.hash(messageType, parentRepairSession, isGlobal, isIncremental, timestamp, tableIds, ranges); + return Objects.hash(messageType, parentRepairSession, isGlobal, previewKind, isIncremental, timestamp, tableIds, ranges); } public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage> @@ -94,6 +98,7 @@ public class PrepareMessage extends RepairMessage out.writeBoolean(message.isIncremental); out.writeLong(message.timestamp); out.writeBoolean(message.isGlobal); + out.writeInt(message.previewKind.getSerializationVal()); } public PrepareMessage deserialize(DataInputPlus in, int version) throws IOException @@ -110,7 +115,8 @@ public class PrepareMessage extends RepairMessage boolean isIncremental = in.readBoolean(); long timestamp = in.readLong(); boolean isGlobal = in.readBoolean(); - return new PrepareMessage(parentRepairSession, tableIds, ranges, isIncremental, timestamp, isGlobal); + PreviewKind previewKind = PreviewKind.deserialize(in.readInt()); + return new PrepareMessage(parentRepairSession, tableIds, ranges, isIncremental, timestamp, isGlobal, previewKind); } public long serializedSize(PrepareMessage message, int version) @@ -126,6 +132,7 @@ public class PrepareMessage extends RepairMessage size += TypeSizes.sizeof(message.isIncremental); size += TypeSizes.sizeof(message.timestamp); size += TypeSizes.sizeof(message.isGlobal); + size += TypeSizes.sizeof(message.previewKind.getSerializationVal()); return size; } }