Add repair streaming preview

Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13257


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4cfaf855
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4cfaf855
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4cfaf855

Branch: refs/heads/trunk
Commit: 4cfaf855c404256a9dd281d5066cc076232d72ff
Parents: 1e20d95
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Thu Feb 23 09:35:04 2017 -0800
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Mon Apr 24 09:21:33 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   6 +-
 doc/gen-nodetool-docs.py                        |   2 +-
 doc/source/operating/repair.rst                 |  87 ++++++-
 .../db/compaction/CompactionManager.java        |  24 +-
 .../org/apache/cassandra/dht/RangeStreamer.java |   3 +-
 .../cassandra/exceptions/RepairException.java   |  12 +-
 .../cassandra/io/sstable/SSTableLoader.java     |   2 +-
 .../net/IncomingStreamingConnection.java        |   2 +-
 .../apache/cassandra/repair/LocalSyncTask.java  |  16 +-
 .../apache/cassandra/repair/RemoteSyncTask.java |  17 +-
 .../org/apache/cassandra/repair/RepairJob.java  |  39 ++--
 .../apache/cassandra/repair/RepairJobDesc.java  |   6 +
 .../repair/RepairMessageVerbHandler.java        |  15 +-
 .../apache/cassandra/repair/RepairRunnable.java | 100 +++++++-
 .../apache/cassandra/repair/RepairSession.java  |  42 ++--
 .../cassandra/repair/StreamingRepairTask.java   |  15 +-
 .../org/apache/cassandra/repair/SyncStat.java   |  18 +-
 .../org/apache/cassandra/repair/SyncTask.java   |   7 +-
 .../apache/cassandra/repair/ValidationTask.java |   7 +-
 .../org/apache/cassandra/repair/Validator.java  |  16 +-
 .../repair/consistent/SyncStatSummary.java      | 233 +++++++++++++++++++
 .../repair/messages/PrepareMessage.java         |  13 +-
 .../cassandra/repair/messages/RepairOption.java |  41 +++-
 .../cassandra/repair/messages/SyncComplete.java |  40 +++-
 .../cassandra/repair/messages/SyncRequest.java  |  16 +-
 .../cassandra/service/ActiveRepairService.java  |  38 ++-
 .../cassandra/streaming/ConnectionHandler.java  |  23 +-
 .../apache/cassandra/streaming/PreviewKind.java |  76 ++++++
 .../apache/cassandra/streaming/SessionInfo.java |   7 +
 .../cassandra/streaming/SessionSummary.java     | 141 +++++++++++
 .../cassandra/streaming/StreamCoordinator.java  |   8 +-
 .../apache/cassandra/streaming/StreamPlan.java  |  11 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   3 +
 .../cassandra/streaming/StreamResultFuture.java |   9 +-
 .../cassandra/streaming/StreamSession.java      |  57 ++++-
 .../apache/cassandra/streaming/StreamState.java |   7 +
 .../streaming/messages/StreamInitMessage.java   |  10 +-
 .../apache/cassandra/tools/nodetool/Repair.java |  29 +++
 .../serialization/4.0/gms.EndpointState.bin     | Bin 0 -> 73 bytes
 test/data/serialization/4.0/gms.Gossip.bin      | Bin 0 -> 158 bytes
 .../serialization/4.0/service.SyncComplete.bin  | Bin 0 -> 538 bytes
 .../serialization/4.0/service.SyncRequest.bin   | Bin 0 -> 227 bytes
 .../4.0/service.ValidationComplete.bin          | Bin 0 -> 1251 bytes
 .../4.0/service.ValidationRequest.bin           | Bin 0 -> 167 bytes
 .../4.0/utils.EstimatedHistogram.bin            | Bin 0 -> 97500 bytes
 .../cassandra/AbstractSerializationsTester.java |   3 +-
 ...tionManagerGetSSTablesForValidationTest.java |  10 +-
 .../LeveledCompactionStrategyTest.java          |  12 +-
 .../cassandra/dht/StreamStateStoreTest.java     |   5 +-
 .../io/sstable/SSTableRewriterTest.java         |   5 +-
 .../cassandra/repair/AbstractRepairTest.java    |   4 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |  14 +-
 .../cassandra/repair/RepairSessionTest.java     |   6 +-
 .../repair/StreamingRepairTaskTest.java         |   9 +-
 .../apache/cassandra/repair/ValidatorTest.java  |   9 +-
 .../AbstractConsistentSessionTest.java          |   4 +-
 .../consistent/PendingAntiCompactionTest.java   |   3 +-
 .../RepairMessageSerializationsTest.java        |  17 +-
 .../service/ActiveRepairServiceTest.java        |  14 +-
 .../cassandra/service/SerializationsTest.java   |  23 +-
 .../cassandra/streaming/StreamSessionTest.java  |   3 +-
 .../streaming/StreamTransferTaskTest.java       |   6 +-
 63 files changed, 1156 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c82c8ee..fc06bde 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add repair streaming preview (CASSANDRA-13257)
  * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
  * Change protocol to allow sending key space independent of query string 
(CASSANDRA-10145)
  * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1ec4637..1c456b5 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -23,6 +23,8 @@ New features
    - Support for arithmetic operations between `timestamp`/`date` and 
`duration` has been added.
      See CASSANDRA-11936
    - Support for arithmetic operations on number has been added. See 
CASSANDRA-11935
+   - Preview expected streaming required for a repair (nodetool repair 
--preview), and validate the
+     consistency of repaired data between nodes (nodetool repair --validate). 
See CASSANDRA-13257
 
 Upgrading
 ---------
@@ -46,7 +48,9 @@ Upgrading
       data to be inconsistent between nodes. The fix changes the behavior of 
both
       full and incremental repairs. For full repairs, data is no longer marked
       repaired. For incremental repairs, anticompaction is run at the beginning
-      of the repair, instead of at the end.
+      of the repair, instead of at the end. If incremental repair was being 
used
+      prior to upgrading, a full repair should be run after upgrading to 
resolve
+      any inconsistencies.
     - Config option index_interval has been removed (it was deprecated since 
2.0)
     - Deprecated repair JMX APIs are removed.
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/doc/gen-nodetool-docs.py
----------------------------------------------------------------------
diff --git a/doc/gen-nodetool-docs.py b/doc/gen-nodetool-docs.py
index ae01534..e3862f7 100644
--- a/doc/gen-nodetool-docs.py
+++ b/doc/gen-nodetool-docs.py
@@ -27,7 +27,7 @@ nodetool = "../bin/nodetool"
 outdir = "source/tools/nodetool"
 helpfilename = outdir + "/nodetool.txt"
 command_re = re.compile("(    )([_a-z]+)")
-commandRSTContent = ".. _{0}\n\n{0}\n-------\n\nUsage\n---------\n\n.. 
include:: {0}.txt\n  :literal:\n\n"
+commandRSTContent = ".. 
_nodetool_{0}:\n\n{0}\n-------\n\nUsage\n---------\n\n.. include:: {0}.txt\n  
:literal:\n\n"
 
 # create the documentation directory
 if not os.path.exists(outdir):

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/doc/source/operating/repair.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/repair.rst b/doc/source/operating/repair.rst
index 97d8ce8..97115dc 100644
--- a/doc/source/operating/repair.rst
+++ b/doc/source/operating/repair.rst
@@ -16,7 +16,92 @@
 
 .. highlight:: none
 
+.. _repair:
+
 Repair
 ------
 
-.. todo:: todo
+Cassandra is designed to remain available if one of it's nodes is down or 
unreachable. However, when a node is down or
+unreachable, it needs to eventually discover the writes it missed. Hints 
attempt to inform a node of missed writes, but
+are a best effort, and aren't guaranteed to inform a node of 100% of the 
writes it missed. These inconsistencies can
+eventually result in data loss as nodes are replaced or tombstones expire.
+
+These inconsistencies are fixed with the repair process. Repair synchronizes 
the data between nodes by comparing their
+respective datasets for their common token ranges, and streaming the 
differences for any out of sync sections between
+the nodes. It compares the data with merkle trees, which are a hierarchy of 
hashes.
+
+Incremental and Full Repairs
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+There are 2 types of repairs: full repairs, and incremental repairs. Full 
repairs operate over all of the data in the
+token range being repaired. Incremental repairs only repair data that's been 
written since the previous incremental repair.
+
+Incremental repairs are the default repair type, and if run regularly, can 
significantly reduce the time and io cost of
+performing a repair. However, it's important to understand that once an 
incremental repair marks data as repaired, it won't
+try to repair it again. This is fine for syncing up missed writes, but it 
doesn't protect against things like disk corruption,
+data loss by operator error, or bugs in Cassandra. For this reason, full 
repairs should still be run occasionally.
+
+Usage and Best Practices
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+Since repair can result in a lot of disk and network io, it's not run 
automatically by Cassandra. It is run by the operator
+via nodetool.
+
+Incremental repair is the default and is run with the following command:
+
+::
+
+    nodetool repair
+
+A full repair can be run with the following command:
+
+::
+
+    nodetool repair --full
+
+Additionally, repair can be run on a single keyspace:
+
+::
+
+    nodetool repair [options] <keyspace_name>
+
+Or even on specific tables:
+
+::
+
+    nodetool repair [options] <keyspace_name> <table1> <table2>
+
+
+The repair command only repairs token ranges on the node being repaired, it 
doesn't repair the whole cluster. By default, repair
+will operate on all token ranges replicated by the node you're running repair 
on, which will cause duplicate work if you run it
+on every node. The ``-pr`` flag will only repair the "primary" ranges on a 
node, so you can repair your entire cluster by running
+``nodetool repair -pr`` on each node in a single datacenter.
+
+The specific frequency of repair that's right for your cluster, of course, 
depends on several factors. However, if you're
+just starting out and looking for somewhere to start, running an incremental 
repair every 1-3 days, and a full repair every
+1-3 weeks is probably reasonable. If you don't want to run incremental 
repairs, a full repair every 5 days is a good place
+to start.
+
+At a minimum, repair should be run often enough that the gc grace period never 
expires on unrepaired data. Otherwise, deleted
+data could reappear. With a default gc grace period of 10 days, repairing 
every node in your cluster at least once every 7 days
+will prevent this, while providing enough slack to allow for delays.
+
+Other Options
+^^^^^^^^^^^^^
+
+``-pr, --partitioner-range``
+    Restricts repair to the 'primary' token ranges of the node being repaired. 
A primary range is just a token range for
+    which a node is the first replica in the ring.
+
+``-prv, --preview``
+    Estimates the amount of streaming that would occur for the given repair 
command. This builds the merkle trees, and prints
+    the expected streaming activity, but does not actually do any streaming. 
By default, incremental repairs are estimated,
+    add the ``--full`` flag to estimate a full repair.
+
+``-vd, --validate``
+    Verifies that the repaired data is the same across all nodes. Similiar to 
``--preview``, this builds and compares merkle
+    trees of repaired data, but doesn't do any streaming. This is useful for 
troubleshooting. If this shows that the repaired
+    data is out of sync, a full repair should be run.
+
+.. seealso::
+    :ref:`nodetool repair docs <nodetool_repair>`

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a3ca86a..6f311a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -29,6 +29,7 @@ import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
@@ -69,6 +70,7 @@ import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -611,8 +613,11 @@ public class CompactionManager implements 
CompactionManagerMBean
                                       UUID pendingRepair,
                                       UUID parentRepairSession) throws 
InterruptedException, IOException
     {
-        logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} 
sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), 
validatedForRepair.size(), cfs.getLiveSSTables());
-        logger.trace("[repair #{}] Starting anticompaction for ranges {}", 
parentRepairSession, ranges);
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(parentRepairSession);
+        Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for 
previews");
+
+        logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", 
PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), 
cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables());
+        logger.trace("{} Starting anticompaction for ranges {}", 
PreviewKind.NONE.logPrefix(parentRepairSession), ranges);
         Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
         // we should only notify that repair status changed if it actually did:
@@ -640,7 +645,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                 {
                     if (r.contains(sstableRange))
                     {
-                        logger.info("[repair #{}] SSTable {} fully contained 
in range {}, mutating repairedAt instead of anticompacting", 
parentRepairSession, sstable, r);
+                        logger.info("{} SSTable {} fully contained in range 
{}, mutating repairedAt instead of anticompacting", 
PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r);
                         
sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 
repairedAt, pendingRepair);
                         sstable.reloadSSTableMetadata();
                         mutatedRepairStatuses.add(sstable);
@@ -652,14 +657,14 @@ public class CompactionManager implements 
CompactionManagerMBean
                     }
                     else if (sstableRange.intersects(r))
                     {
-                        logger.info("[repair #{}] SSTable {} ({}) will be 
anticompacted on range {}", parentRepairSession, sstable, sstableRange, r);
+                        logger.info("{} SSTable {} ({}) will be anticompacted 
on range {}", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, 
sstableRange, r);
                         shouldAnticompact = true;
                     }
                 }
 
                 if (!shouldAnticompact)
                 {
-                    logger.info("[repair #{}] SSTable {} ({}) does not 
intersect repaired ranges {}, not touching repairedAt.", parentRepairSession, 
sstable, sstableRange, normalizedRanges);
+                    logger.info("{} SSTable {} ({}) does not intersect 
repaired ranges {}, not touching repairedAt.", 
PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableRange, 
normalizedRanges);
                     nonAnticompacting.add(sstable);
                     sstableIterator.remove();
                 }
@@ -678,7 +683,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             txn.close();
         }
 
-        logger.info("[repair #{}] Completed anticompaction successfully", 
parentRepairSession);
+        logger.info("{} Completed anticompaction successfully", 
PreviewKind.NONE.logPrefix(parentRepairSession));
     }
 
     public void performMaximal(final ColumnFamilyStore cfStore, boolean 
splitOutput)
@@ -1427,7 +1432,12 @@ public class CompactionManager implements 
CompactionManagerMBean
         Set<SSTableReader> sstablesToValidate = new HashSet<>();
 
         com.google.common.base.Predicate<SSTableReader> predicate;
-        if (validator.isConsistent)
+        if (prs.isPreview())
+        {
+            predicate = prs.getPreviewPredicate();
+
+        }
+        else if (validator.isConsistent)
         {
             predicate = s -> 
validator.desc.parentSessionId.equals(s.getSSTableMetadata().pendingRepair);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index fd976c9..85c2307 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -39,6 +39,7 @@ import 
org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamOperation;
@@ -156,7 +157,7 @@ public class RangeStreamer
         this.tokens = tokens;
         this.address = address;
         this.description = streamOperation.getDescription();
-        this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, 
true, connectSequentially, null);
+        this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, 
true, connectSequentially, null, PreviewKind.NONE);
         this.useStrictConsistency = useStrictConsistency;
         this.snitch = snitch;
         this.stateStore = stateStore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/exceptions/RepairException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/RepairException.java 
b/src/java/org/apache/cassandra/exceptions/RepairException.java
index 2f5f2c1..db219a2 100644
--- a/src/java/org/apache/cassandra/exceptions/RepairException.java
+++ b/src/java/org/apache/cassandra/exceptions/RepairException.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.exceptions;
 
 import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.streaming.PreviewKind;
 
 /**
  * Exception thrown during repair
@@ -25,22 +26,23 @@ import org.apache.cassandra.repair.RepairJobDesc;
 public class RepairException extends Exception
 {
     public final RepairJobDesc desc;
+    public final PreviewKind previewKind;
 
     public RepairException(RepairJobDesc desc, String message)
     {
-        super(message);
-        this.desc = desc;
+        this(desc, null, message);
     }
 
-    public RepairException(RepairJobDesc desc, String message, Throwable cause)
+    public RepairException(RepairJobDesc desc, PreviewKind previewKind, String 
message)
     {
-        super(message, cause);
+        super(message);
         this.desc = desc;
+        this.previewKind = previewKind != null ? previewKind : 
PreviewKind.NONE;
     }
 
     @Override
     public String getMessage()
     {
-        return desc + " " + super.getMessage();
+        return desc.toString(previewKind) + ' ' + super.getMessage();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index e9ea35a..ff47bec 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, 
connectionsPerHost, false, false, 
null).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, 
connectionsPerHost, false, false, null, 
PreviewKind.NONE).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = 
client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java 
b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index e5fdc99..8db5fcb 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -73,7 +73,7 @@ public class IncomingStreamingConnection extends Thread 
implements Closeable
             // The receiving side distinguish two connections by looking at 
StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing 
streams because we want to
             // parallelize said streams and the socket is blocking, so we 
might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.streamOperation, init.from, this, init.isForOutgoing, 
version, init.keepSSTableLevel, init.pendingRepair);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.streamOperation, init.from, this, init.isForOutgoing, 
version, init.keepSSTableLevel, init.pendingRepair, init.previewKind);
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 06b1661..5bb66dc 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
@@ -38,6 +39,8 @@ import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 
 /**
  * LocalSyncTask performs streaming between local(coordinator) node and remote 
replica.
@@ -51,9 +54,9 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
     private final UUID pendingRepair;
     private final boolean pullRepair;
 
-    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, 
UUID pendingRepair, boolean pullRepair)
+    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, 
UUID pendingRepair, boolean pullRepair, PreviewKind previewKind)
     {
-        super(desc, r1, r2);
+        super(desc, r1, r2, previewKind);
         this.pendingRepair = pendingRepair;
         this.pullRepair = pullRepair;
     }
@@ -62,7 +65,7 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
     @VisibleForTesting
     StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, 
List<Range<Token>> differences)
     {
-        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, 
false, pendingRepair)
+        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, 
false, pendingRepair, previewKind)
                           .listeners(this)
                           .flushBeforeTransfer(pendingRepair == null)
                           .requestRanges(dst, preferred, desc.keyspace, 
differences, desc.columnFamily);  // request ranges from the remote node
@@ -79,6 +82,7 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
      * Starts sending/receiving our list of differences to/from the remote 
endpoint: creates a callback
      * that will be called out of band once the streams complete.
      */
+    @Override
     protected void startSync(List<Range<Token>> differences)
     {
         InetAddress local = FBUtilities.getBroadcastAddress();
@@ -87,7 +91,7 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
         InetAddress preferred = SystemKeyspace.getPreferredIP(dst);
 
         String message = String.format("Performing streaming repair of %d 
ranges with %s", differences.size(), dst);
-        logger.info("[repair #{}] {}", desc.sessionId, message);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
 
         createStreamPlan(dst, preferred, differences).execute();
@@ -122,9 +126,9 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
     public void onSuccess(StreamState result)
     {
         String message = String.format("Sync complete using session %s between 
%s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
-        logger.info("[repair #{}] {}", desc.sessionId, message);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
-        set(stat);
+        set(stat.withSummaries(result.createSummaries()));
     }
 
     public void onFailure(Throwable t)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java 
b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
index ededc40..a26a392 100644
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
@@ -28,6 +28,8 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -41,30 +43,31 @@ public class RemoteSyncTask extends SyncTask
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteSyncTask.class);
 
-    public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2)
+    public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse 
r2, PreviewKind previewKind)
     {
-        super(desc, r1, r2);
+        super(desc, r1, r2, previewKind);
     }
 
+    @Override
     protected void startSync(List<Range<Token>> differences)
     {
         InetAddress local = FBUtilities.getBroadcastAddress();
-        SyncRequest request = new SyncRequest(desc, local, r1.endpoint, 
r2.endpoint, differences);
+        SyncRequest request = new SyncRequest(desc, local, r1.endpoint, 
r2.endpoint, differences, previewKind);
         String message = String.format("Forwarding streaming repair of %d 
ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, 
request.dst);
-        logger.info("[repair #{}] {}", desc.sessionId, message);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
         MessagingService.instance().sendOneWay(request.createMessage(), 
request.src);
     }
 
-    public void syncComplete(boolean success)
+    public void syncComplete(boolean success, List<SessionSummary> summaries)
     {
         if (success)
         {
-            set(stat);
+            set(stat.withSummaries(summaries));
         }
         else
         {
-            setException(new RepairException(desc, String.format("Sync failed 
between %s and %s", r1.endpoint, r2.endpoint)));
+            setException(new RepairException(desc, previewKind, 
String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint)));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index d1f6a94..d6c1176 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -42,6 +43,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
     private final RepairParallelism parallelismDegree;
     private final ListeningExecutorService taskExecutor;
     private final boolean isConsistent;
+    private final PreviewKind previewKind;
 
     /**
      * Create repair job to run on specific columnfamily
@@ -49,13 +51,14 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
      * @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
      */
-    public RepairJob(RepairSession session, String columnFamily, boolean 
isConsistent)
+    public RepairJob(RepairSession session, String columnFamily, boolean 
isConsistent, PreviewKind previewKind)
     {
         this.session = session;
         this.desc = new RepairJobDesc(session.parentRepairSession, 
session.getId(), session.keyspace, columnFamily, session.getRanges());
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
         this.isConsistent = isConsistent;
+        this.previewKind = previewKind;
     }
 
     /**
@@ -128,11 +131,11 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
                         SyncTask task;
                         if (r1.endpoint.equals(local) || 
r2.endpoint.equals(local))
                         {
-                            task = new LocalSyncTask(desc, r1, r2, 
isConsistent ? desc.parentSessionId : null, session.pullRepair);
+                            task = new LocalSyncTask(desc, r1, r2, 
isConsistent ? desc.parentSessionId : null, session.pullRepair, 
session.previewKind);
                         }
                         else
                         {
-                            task = new RemoteSyncTask(desc, r1, r2);
+                            task = new RemoteSyncTask(desc, r1, r2, 
session.previewKind);
                             // RemoteSyncTask expects SyncComplete message 
sent back.
                             // Register task to RepairSession to receive 
response.
                             session.waitForSync(Pair.create(desc, new 
NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
@@ -150,8 +153,11 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
         {
             public void onSuccess(List<SyncStat> stats)
             {
-                logger.info("[repair #{}] {} is fully synced", 
session.getId(), desc.columnFamily);
-                SystemDistributedKeyspace.successfulRepairJob(session.getId(), 
desc.keyspace, desc.columnFamily);
+                if (!previewKind.isPreview())
+                {
+                    logger.info("{} {} is fully synced", 
previewKind.logPrefix(session.getId()), desc.columnFamily);
+                    
SystemDistributedKeyspace.successfulRepairJob(session.getId(), desc.keyspace, 
desc.columnFamily);
+                }
                 set(new RepairResult(desc, stats));
             }
 
@@ -160,8 +166,11 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
              */
             public void onFailure(Throwable t)
             {
-                logger.warn("[repair #{}] {} sync failed", session.getId(), 
desc.columnFamily);
-                SystemDistributedKeyspace.failedRepairJob(session.getId(), 
desc.keyspace, desc.columnFamily, t);
+                if (!previewKind.isPreview())
+                {
+                    logger.warn("{} {} sync failed", 
previewKind.logPrefix(session.getId()), desc.columnFamily);
+                    SystemDistributedKeyspace.failedRepairJob(session.getId(), 
desc.keyspace, desc.columnFamily, t);
+                }
                 setException(t);
             }
         }, taskExecutor);
@@ -179,13 +188,13 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
     private ListenableFuture<List<TreeResponse>> 
sendValidationRequest(Collection<InetAddress> endpoints)
     {
         String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
-        logger.info("[repair #{}] {}", desc.sessionId, message);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
         int gcBefore = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
         for (InetAddress endpoint : endpoints)
         {
-            ValidationTask task = new ValidationTask(desc, endpoint, gcBefore);
+            ValidationTask task = new ValidationTask(desc, endpoint, gcBefore, 
previewKind);
             tasks.add(task);
             session.waitForValidation(Pair.create(desc, endpoint), task);
             taskExecutor.execute(task);
@@ -199,14 +208,14 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
     private ListenableFuture<List<TreeResponse>> 
sendSequentialValidationRequest(Collection<InetAddress> endpoints)
     {
         String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
-        logger.info("[repair #{}] {}", desc.sessionId, message);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
         int gcBefore = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
 
         Queue<InetAddress> requests = new LinkedList<>(endpoints);
         InetAddress address = requests.poll();
-        ValidationTask firstTask = new ValidationTask(desc, address, gcBefore);
+        ValidationTask firstTask = new ValidationTask(desc, address, gcBefore, 
previewKind);
         logger.info("Validating {}", address);
         session.waitForValidation(Pair.create(desc, address), firstTask);
         tasks.add(firstTask);
@@ -214,7 +223,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
         while (requests.size() > 0)
         {
             final InetAddress nextAddress = requests.poll();
-            final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, gcBefore);
+            final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, gcBefore, previewKind);
             tasks.add(nextTask);
             Futures.addCallback(currentTask, new FutureCallback<TreeResponse>()
             {
@@ -241,7 +250,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
     private ListenableFuture<List<TreeResponse>> 
sendDCAwareValidationRequest(Collection<InetAddress> endpoints)
     {
         String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
-        logger.info("[repair #{}] {}", desc.sessionId, message);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
         int gcBefore = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
@@ -263,7 +272,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
         {
             Queue<InetAddress> requests = entry.getValue();
             InetAddress address = requests.poll();
-            ValidationTask firstTask = new ValidationTask(desc, address, 
gcBefore);
+            ValidationTask firstTask = new ValidationTask(desc, address, 
gcBefore, previewKind);
             logger.info("Validating {}", address);
             session.waitForValidation(Pair.create(desc, address), firstTask);
             tasks.add(firstTask);
@@ -271,7 +280,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
             while (requests.size() > 0)
             {
                 final InetAddress nextAddress = requests.poll();
-                final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, gcBefore);
+                final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, gcBefore, previewKind);
                 tasks.add(nextTask);
                 Futures.addCallback(currentTask, new 
FutureCallback<TreeResponse>()
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java 
b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index be3daef..8f70e00 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
@@ -66,6 +67,11 @@ public class RepairJobDesc
         return "[repair #" + sessionId + " on " + keyspace + "/" + 
columnFamily + ", " + ranges + "]";
     }
 
+    public String toString(PreviewKind previewKind)
+    {
+        return '[' + previewKind.logPrefix() + " #" + sessionId + " on " + 
keyspace + "/" + columnFamily + ", " + ranges + "]";
+    }
+
     @Override
     public boolean equals(Object o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index b6b9f87..ed62229 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
 
 /**
  * Handles all repair related message.
@@ -50,6 +51,12 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
         return 
ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID);
     }
 
+    private PreviewKind previewKind(UUID sessionID)
+    {
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionID);
+        return prs != null ? prs.previewKind : PreviewKind.NONE;
+    }
+
     public void doVerb(final MessageIn<RepairMessage> message, final int id)
     {
         // TODO add cancel/interrupt message
@@ -79,7 +86,8 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                                                                              
prepareMessage.ranges,
                                                                              
prepareMessage.isIncremental,
                                                                              
prepareMessage.timestamp,
-                                                                             
prepareMessage.isGlobal);
+                                                                             
prepareMessage.isGlobal,
+                                                                             
prepareMessage.previewKind);
                     MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                     break;
 
@@ -127,7 +135,8 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     }
 
                     
ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId);
-                    Validator validator = new Validator(desc, message.from, 
validationRequest.gcBefore, isConsistent(desc.parentSessionId));
+                    Validator validator = new Validator(desc, message.from, 
validationRequest.gcBefore,
+                                                        
isConsistent(desc.parentSessionId), previewKind(desc.parentSessionId));
                     CompactionManager.instance.submitValidation(store, 
validator);
                     break;
 
@@ -135,7 +144,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     // forwarded sync request
                     SyncRequest request = (SyncRequest) message.payload;
                     logger.debug("Syncing {}", request);
-                    StreamingRepairTask task = new StreamingRepairTask(desc, 
request, isConsistent(desc.parentSessionId) ? desc.parentSessionId : null);
+                    StreamingRepairTask task = new StreamingRepairTask(desc, 
request, isConsistent(desc.parentSessionId) ? desc.parentSessionId : null, 
request.previewKind);
                     task.run();
                     break;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index b8bef95..354d8b5 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.repair.consistent.SyncStatSummary;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -50,6 +51,7 @@ import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
@@ -218,7 +220,11 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             cfnames[i] = columnFamilyStores.get(i).name;
         }
 
-        SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, 
cfnames, options);
+        if (!options.isPreview())
+        {
+            SystemDistributedKeyspace.startParentRepair(parentSession, 
keyspace, cfnames, options);
+        }
+
         long repairedAt;
         try
         {
@@ -228,12 +234,19 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         }
         catch (Throwable t)
         {
-            SystemDistributedKeyspace.failParentRepair(parentSession, t);
+            if (!options.isPreview())
+            {
+                SystemDistributedKeyspace.failParentRepair(parentSession, t);
+            }
             fireErrorAndComplete(tag, progress.get(), totalProgress, 
t.getMessage());
             return;
         }
 
-        if (options.isIncremental())
+        if (options.isPreview())
+        {
+            previewRepair(parentSession, repairedAt, startTime, traceState, 
allNeighbors, commonRanges, cfnames);
+        }
+        else if (options.isIncremental())
         {
             consistentRepair(parentSession, repairedAt, startTime, traceState, 
allNeighbors, commonRanges, cfnames);
         }
@@ -311,6 +324,76 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         Futures.addCallback(repairResult, new 
RepairCompleteCallback(parentSession, ranges, startTime, traceState, 
hasFailure, executor));
     }
 
+    private void previewRepair(UUID parentSession,
+                               long repairedAt,
+                               long startTime,
+                               TraceState traceState,
+                               Set<InetAddress> allNeighbors,
+                               List<Pair<Set<InetAddress>, ? extends 
Collection<Range<Token>>>> commonRanges,
+                               String... cfnames)
+    {
+
+        logger.debug("Starting preview repair for {}", parentSession);
+        // Set up RepairJob executor for this repair command.
+        ListeningExecutorService executor = createExecutor();
+
+        final ListenableFuture<List<RepairSessionResult>> allSessions = 
submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
+
+        Futures.addCallback(allSessions, new 
FutureCallback<List<RepairSessionResult>>()
+        {
+            public void onSuccess(List<RepairSessionResult> results)
+            {
+                try
+                {
+                    PreviewKind previewKind = options.getPreviewKind();
+                    assert previewKind != PreviewKind.NONE;
+                    SyncStatSummary summary = new SyncStatSummary(true);
+                    summary.consumeSessionResults(results);
+
+                    if (summary.isEmpty())
+                    {
+                        String message = previewKind == PreviewKind.REPAIRED ? 
"Repaired data is in sync" : "Previewed data was in sync";
+                        logger.info(message);
+                        fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, 
message));
+                    }
+                    else
+                    {
+                        String message =  (previewKind == PreviewKind.REPAIRED 
? "Repaired data is inconsistent\n" : "Preview complete\n") + 
summary.toString();
+                        logger.info(message);
+                        fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, 
message));
+                    }
+
+                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
+                                                             "Repair preview 
completed successfully"));
+                    complete();
+                }
+                catch (Throwable t)
+                {
+                    logger.error("Error completing preview repair", t);
+                    onFailure(t);
+                }
+            }
+
+            public void onFailure(Throwable t)
+            {
+                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, 
t.getMessage()));
+                logger.error("Error completing preview repair", t);
+                complete();
+            }
+
+            private void complete()
+            {
+                logger.debug("Preview repair {} completed", parentSession);
+
+                String duration = 
DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
+                                                                          
true, true);
+                String message = String.format("Repair preview #%d finished in 
%s", cmd, duration);
+                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, 
message));
+                executor.shutdownNow();
+            }
+        });
+    }
+
     private ListenableFuture<List<RepairSessionResult>> 
submitRepairSessions(UUID parentSession,
                                                                              
boolean isConsistent,
                                                                              
ListeningExecutorService executor,
@@ -327,6 +410,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                                                                                
      p.left,
                                                                                
      isConsistent,
                                                                                
      options.isPullRepair(),
+                                                                               
      options.getPreviewKind(),
                                                                                
      executor,
                                                                                
      cfnames);
             if (session == null)
@@ -405,7 +489,10 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
 
         public void onSuccess(Object result)
         {
-            SystemDistributedKeyspace.successfulParentRepair(parentSession, 
successfulRanges);
+            if (!options.isPreview())
+            {
+                
SystemDistributedKeyspace.successfulParentRepair(parentSession, 
successfulRanges);
+            }
             if (hasFailure.get())
             {
                 fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
@@ -422,7 +509,10 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         public void onFailure(Throwable t)
         {
             fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, 
progress.get(), totalProgress, t.getMessage()));
-            SystemDistributedKeyspace.failParentRepair(parentSession, t);
+            if (!options.isPreview())
+            {
+                SystemDistributedKeyspace.failParentRepair(parentSession, t);
+            }
             repairComplete();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 2aa068c..8aa9590 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTrees;
@@ -91,6 +93,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
     public final Collection<Range<Token>> ranges;
     public final Set<InetAddress> endpoints;
     public final boolean isConsistent;
+    public final PreviewKind previewKind;
 
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
@@ -125,6 +128,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
                          Set<InetAddress> endpoints,
                          boolean isConsistent,
                          boolean pullRepair,
+                         PreviewKind previewKind,
                          String... cfnames)
     {
         assert cfnames.length > 0 : "Repairing no column families seems 
pointless, doesn't it";
@@ -137,6 +141,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         this.ranges = ranges;
         this.endpoints = endpoints;
         this.isConsistent = isConsistent;
+        this.previewKind = previewKind;
         this.pullRepair = pullRepair;
     }
 
@@ -177,7 +182,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         }
 
         String message = String.format("Received merkle tree for %s from %s", 
desc.columnFamily, endpoint);
-        logger.info("[repair #{}] {}", getId(), message);
+        logger.info("{} {}", previewKind.logPrefix(getId()), message);
         Tracing.traceRepair(message);
         task.treesReceived(trees);
     }
@@ -189,7 +194,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
      * @param nodes nodes that completed sync
      * @param success true if sync succeeded
      */
-    public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean 
success)
+    public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean 
success, List<SessionSummary> summaries)
     {
         RemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
         if (task == null)
@@ -198,8 +203,8 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
             return;
         }
 
-        logger.debug("[repair #{}] Repair completed between {} and {} on {}", 
getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily);
-        task.syncComplete(success);
+        logger.debug("{} Repair completed between {} and {} on {}", 
previewKind.logPrefix(getId()), nodes.endpoint1, nodes.endpoint2, 
desc.columnFamily);
+        task.syncComplete(success, summaries);
     }
 
     private String repairedNodes()
@@ -225,16 +230,22 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         if (terminated)
             return;
 
-        logger.info("[repair #{}] new session: will sync {} on range {} for 
{}.{}", getId(), repairedNodes(), ranges, keyspace, Arrays.toString(cfnames));
+        logger.info("{} new session: will sync {} on range {} for {}.{}", 
previewKind.logPrefix(getId()), repairedNodes(), ranges, keyspace, 
Arrays.toString(cfnames));
         Tracing.traceRepair("Syncing range {}", ranges);
-        SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, 
keyspace, cfnames, ranges, endpoints);
+        if (!previewKind.isPreview())
+        {
+            SystemDistributedKeyspace.startRepairs(getId(), 
parentRepairSession, keyspace, cfnames, ranges, endpoints);
+        }
 
         if (endpoints.isEmpty())
         {
-            logger.info("[repair #{}] {}", getId(), message = 
String.format("No neighbors to repair with on range %s: session completed", 
ranges));
+            logger.info("{} {}", previewKind.logPrefix(getId()), message = 
String.format("No neighbors to repair with on range %s: session completed", 
ranges));
             Tracing.traceRepair(message);
             set(new RepairSessionResult(id, keyspace, ranges, 
Lists.<RepairResult>newArrayList()));
-            SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, 
new RuntimeException(message));
+            if (!previewKind.isPreview())
+            {
+                SystemDistributedKeyspace.failRepairs(getId(), keyspace, 
cfnames, new RuntimeException(message));
+            }
             return;
         }
 
@@ -244,10 +255,13 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
             if (!FailureDetector.instance.isAlive(endpoint))
             {
                 message = String.format("Cannot proceed on repair because a 
neighbor (%s) is dead: session failed", endpoint);
-                logger.error("[repair #{}] {}", getId(), message);
+                logger.error("{} {}", previewKind.logPrefix(getId()), message);
                 Exception e = new IOException(message);
                 setException(e);
-                SystemDistributedKeyspace.failRepairs(getId(), keyspace, 
cfnames, e);
+                if (!previewKind.isPreview())
+                {
+                    SystemDistributedKeyspace.failRepairs(getId(), keyspace, 
cfnames, e);
+                }
                 return;
             }
         }
@@ -256,7 +270,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         List<ListenableFuture<RepairResult>> jobs = new 
ArrayList<>(cfnames.length);
         for (String cfname : cfnames)
         {
-            RepairJob job = new RepairJob(this, cfname, isConsistent);
+            RepairJob job = new RepairJob(this, cfname, isConsistent, 
previewKind);
             executor.execute(job);
             jobs.add(job);
         }
@@ -267,7 +281,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
             public void onSuccess(List<RepairResult> results)
             {
                 // this repair session is completed
-                logger.info("[repair #{}] {}", getId(), "Session completed 
successfully");
+                logger.info("{} {}", previewKind.logPrefix(getId()), "Session 
completed successfully");
                 Tracing.traceRepair("Completed sync of range {}", ranges);
                 set(new RepairSessionResult(id, keyspace, ranges, results));
 
@@ -278,7 +292,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
 
             public void onFailure(Throwable t)
             {
-                logger.error(String.format("[repair #%s] Session completed 
with the following error", getId()), t);
+                logger.error("{} Session completed with the following error", 
previewKind.logPrefix(getId()), t);
                 Tracing.traceRepair("Session completed with the following 
error: {}", t);
                 forceShutdown(t);
             }
@@ -335,7 +349,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
             return;
 
         Exception exception = new IOException(String.format("Endpoint %s 
died", endpoint));
-        logger.error(String.format("[repair #%s] session completed with the 
following error", getId()), exception);
+        logger.error(String.format("{} session completed with the following 
error", previewKind.logPrefix(getId())), exception);
         // If a node failed, we stop everything (though there could still be 
some activity in the background)
         forceShutdown(exception);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 7042de1..f43010b 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
 import java.util.UUID;
+import java.util.Collections;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -28,7 +29,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.repair.messages.SyncRequest;
-import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
 import org.apache.cassandra.streaming.StreamPlan;
@@ -46,12 +47,14 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
     private final RepairJobDesc desc;
     private final SyncRequest request;
     private final UUID pendingRepair;
+    private final PreviewKind previewKind;
 
-    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, UUID 
pendingRepair)
+    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, UUID 
pendingRepair, PreviewKind previewKind)
     {
         this.desc = desc;
         this.request = request;
         this.pendingRepair = pendingRepair;
+        this.previewKind = previewKind;
     }
 
     public void run()
@@ -65,7 +68,7 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
     @VisibleForTesting
     StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred)
     {
-        return new StreamPlan(StreamOperation.REPAIR, 1, false, false, 
pendingRepair)
+        return new StreamPlan(StreamOperation.REPAIR, 1, false, false, 
pendingRepair, previewKind)
                .listeners(this)
                .flushBeforeTransfer(pendingRepair == null) // sstables are 
isolated at the beginning of an incremental repair session, so flushing isn't 
neccessary
                .requestRanges(dest, preferred, desc.keyspace, request.ranges, 
desc.columnFamily) // request ranges from the remote node
@@ -83,8 +86,8 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
      */
     public void onSuccess(StreamState state)
     {
-        logger.info("[repair #{}] streaming task succeed, returning response 
to {}", desc.sessionId, request.initiator);
-        MessagingService.instance().sendOneWay(new SyncComplete(desc, 
request.src, request.dst, true).createMessage(), request.initiator);
+        logger.info("{} streaming task succeed, returning response to {}", 
previewKind.logPrefix(desc.sessionId), request.initiator);
+        MessagingService.instance().sendOneWay(new SyncComplete(desc, 
request.src, request.dst, true, state.createSummaries()).createMessage(), 
request.initiator);
     }
 
     /**
@@ -92,6 +95,6 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
      */
     public void onFailure(Throwable t)
     {
-        MessagingService.instance().sendOneWay(new SyncComplete(desc, 
request.src, request.dst, false).createMessage(), request.initiator);
+        MessagingService.instance().sendOneWay(new SyncComplete(desc, 
request.src, request.dst, false, Collections.emptyList()).createMessage(), 
request.initiator);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/SyncStat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncStat.java 
b/src/java/org/apache/cassandra/repair/SyncStat.java
index 5721a20..dab5659 100644
--- a/src/java/org/apache/cassandra/repair/SyncStat.java
+++ b/src/java/org/apache/cassandra/repair/SyncStat.java
@@ -17,17 +17,33 @@
  */
 package org.apache.cassandra.repair;
 
+import java.util.List;
+
+import org.apache.cassandra.streaming.SessionSummary;
+
 /**
  * Statistics about synchronizing two replica
  */
 public class SyncStat
 {
     public final NodePair nodes;
-    public final long numberOfDifferences;
+    public final long numberOfDifferences; // TODO: revert to Range<Token>
+    public final List<SessionSummary> summaries;
 
     public SyncStat(NodePair nodes, long numberOfDifferences)
     {
+        this(nodes, numberOfDifferences, null);
+    }
+
+    public SyncStat(NodePair nodes, long numberOfDifferences, 
List<SessionSummary> summaries)
+    {
         this.nodes = nodes;
         this.numberOfDifferences = numberOfDifferences;
+        this.summaries = summaries;
+    }
+
+    public SyncStat withSummaries(List<SessionSummary> summaries)
+    {
+        return new SyncStat(nodes, numberOfDifferences, summaries);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java 
b/src/java/org/apache/cassandra/repair/SyncTask.java
index 8adec6f..6f3157a 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.MerkleTrees;
 
@@ -39,14 +40,16 @@ public abstract class SyncTask extends 
AbstractFuture<SyncStat> implements Runna
     protected final RepairJobDesc desc;
     protected final TreeResponse r1;
     protected final TreeResponse r2;
+    protected final PreviewKind previewKind;
 
     protected volatile SyncStat stat;
 
-    public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2)
+    public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, 
PreviewKind previewKind)
     {
         this.desc = desc;
         this.r1 = r1;
         this.r2 = r2;
+        this.previewKind = previewKind;
     }
 
     /**
@@ -60,7 +63,7 @@ public abstract class SyncTask extends 
AbstractFuture<SyncStat> implements Runna
         stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), 
differences.size());
 
         // choose a repair method based on the significance of the difference
-        String format = String.format("[repair #%s] Endpoints %s and %s %%s 
for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
+        String format = String.format("%s Endpoints %s and %s %%s for %s", 
previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, 
desc.columnFamily);
         if (differences.isEmpty())
         {
             logger.info(String.format(format, "are consistent"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/ValidationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java 
b/src/java/org/apache/cassandra/repair/ValidationTask.java
index 177ad3e..f68d3c5 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.AbstractFuture;
 import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.MerkleTrees;
 
 /**
@@ -35,12 +36,14 @@ public class ValidationTask extends 
AbstractFuture<TreeResponse> implements Runn
     private final RepairJobDesc desc;
     private final InetAddress endpoint;
     private final int gcBefore;
+    private final PreviewKind previewKind;
 
-    public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int 
gcBefore)
+    public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int 
gcBefore, PreviewKind previewKind)
     {
         this.desc = desc;
         this.endpoint = endpoint;
         this.gcBefore = gcBefore;
+        this.previewKind = previewKind;
     }
 
     /**
@@ -61,7 +64,7 @@ public class ValidationTask extends 
AbstractFuture<TreeResponse> implements Runn
     {
         if (trees == null)
         {
-            setException(new RepairException(desc, "Validation failed in " + 
endpoint));
+            setException(new RepairException(desc, previewKind, "Validation 
failed in " + endpoint));
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java 
b/src/java/org/apache/cassandra/repair/Validator.java
index e8e3621..ba1fa9d 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
@@ -71,22 +72,25 @@ public class Validator implements Runnable
     // last key seen
     private DecoratedKey lastKey;
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
+    private final PreviewKind previewKind;
+
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
PreviewKind previewKind)
     {
-        this(desc, initiator, gcBefore, false, false);
+        this(desc, initiator, gcBefore, false, false, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
boolean isConsistent)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
boolean isConsistent, PreviewKind previewKind)
     {
-        this(desc, initiator, gcBefore, false, isConsistent);
+        this(desc, initiator, gcBefore, false, isConsistent, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
boolean evenTreeDistribution, boolean isConsistent)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
boolean evenTreeDistribution, boolean isConsistent, PreviewKind previewKind)
     {
         this.desc = desc;
         this.initiator = initiator;
         this.gcBefore = gcBefore;
         this.isConsistent = isConsistent;
+        this.previewKind = previewKind;
         validated = 0;
         range = null;
         ranges = null;
@@ -285,7 +289,7 @@ public class Validator implements Runnable
         // respond to the request that triggered this validation
         if (!initiator.equals(FBUtilities.getBroadcastAddress()))
         {
-            logger.info("[repair #{}] Sending completed merkle tree to {} for 
{}.{}", desc.sessionId, initiator, desc.keyspace, desc.columnFamily);
+            logger.info("{} Sending completed merkle tree to {} for {}.{}", 
previewKind.logPrefix(desc.sessionId), initiator, desc.keyspace, 
desc.columnFamily);
             Tracing.traceRepair("Sending completed merkle tree to {} for 
{}.{}", initiator, desc.keyspace, desc.columnFamily);
         }
         MessagingService.instance().sendOneWay(new ValidationComplete(desc, 
trees).createMessage(), initiator);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java 
b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
new file mode 100644
index 0000000..015b558
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.repair.RepairResult;
+import org.apache.cassandra.repair.RepairSessionResult;
+import org.apache.cassandra.repair.SyncStat;
+import org.apache.cassandra.streaming.SessionSummary;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.collect.Iterables.filter;
+
+public class SyncStatSummary
+{
+
+    private static class Session
+    {
+        final InetAddress src;
+        final InetAddress dst;
+
+        int files = 0;
+        long bytes = 0;
+        long ranges = 0;
+
+        Session(InetAddress src, InetAddress dst)
+        {
+            this.src = src;
+            this.dst = dst;
+        }
+
+        void consumeSummary(StreamSummary summary)
+        {
+            files += summary.files;
+            bytes += summary.totalSize;
+        }
+
+        void consumeSummaries(Collection<StreamSummary> summaries, long 
numRanges)
+        {
+            summaries.forEach(this::consumeSummary);
+            ranges += numRanges;
+        }
+
+        public String toString()
+        {
+            return String.format("%s -> %s: %s ranges, %s sstables, %s bytes", 
src, dst, ranges, files, FBUtilities.prettyPrintMemory(bytes));
+        }
+    }
+
+    private static class Table
+    {
+        final String keyspace;
+
+        final String table;
+
+        int files = -1;
+        long bytes = -1;
+        int ranges = -1;
+        boolean totalsCalculated = false;
+
+        final Map<Pair<InetAddress, InetAddress>, Session> sessions = new 
HashMap<>();
+
+        Table(String keyspace, String table)
+        {
+            this.keyspace = keyspace;
+            this.table = table;
+        }
+
+        Session getOrCreate(InetAddress from, InetAddress to)
+        {
+            Pair<InetAddress, InetAddress> k = Pair.create(from, to);
+            if (!sessions.containsKey(k))
+            {
+                sessions.put(k, new Session(from, to));
+            }
+            return sessions.get(k);
+        }
+
+        void consumeStat(SyncStat stat)
+        {
+            for (SessionSummary summary: stat.summaries)
+            {
+                getOrCreate(summary.coordinator, 
summary.peer).consumeSummaries(summary.sendingSummaries, 
stat.numberOfDifferences);
+                getOrCreate(summary.peer, 
summary.coordinator).consumeSummaries(summary.receivingSummaries, 
stat.numberOfDifferences);
+            }
+        }
+
+        void consumeStats(List<SyncStat> stats)
+        {
+            filter(stats, s -> s.summaries != null).forEach(this::consumeStat);
+        }
+
+        void calculateTotals()
+        {
+            files = 0;
+            bytes = 0;
+            ranges = 0;
+            for (Session session: sessions.values())
+            {
+                files += session.files;
+                bytes += session.bytes;
+                ranges += session.ranges;
+            }
+            totalsCalculated = true;
+        }
+
+        public String toString()
+        {
+            if (!totalsCalculated)
+            {
+                calculateTotals();
+            }
+            StringBuilder output = new StringBuilder();
+
+            output.append(String.format("%s.%s - %s ranges, %s sstables, %s 
bytes\n", keyspace, table, ranges, files, 
FBUtilities.prettyPrintMemory(bytes)));
+            for (Session session: sessions.values())
+            {
+                output.append("    ").append(session.toString()).append('\n');
+            }
+            return output.toString();
+        }
+    }
+
+    private Map<Pair<String, String>, Table> summaries = new HashMap<>();
+    private final boolean isEstimate;
+
+    private int files = -1;
+    private long bytes = -1;
+    private int ranges = -1;
+    private boolean totalsCalculated = false;
+
+    public SyncStatSummary(boolean isEstimate)
+    {
+        this.isEstimate = isEstimate;
+    }
+
+    public void consumeRepairResult(RepairResult result)
+    {
+        Pair<String, String> cf = Pair.create(result.desc.keyspace, 
result.desc.columnFamily);
+        if (!summaries.containsKey(cf))
+        {
+            summaries.put(cf, new Table(cf.left, cf.right));
+        }
+        summaries.get(cf).consumeStats(result.stats);
+    }
+
+    public void consumeSessionResults(List<RepairSessionResult> results)
+    {
+        if (results != null)
+        {
+            filter(results, Objects::nonNull).forEach(r -> 
filter(r.repairJobResults, 
Objects::nonNull).forEach(this::consumeRepairResult));
+        }
+    }
+
+    public boolean isEmpty()
+    {
+        calculateTotals();
+        return files == 0 && bytes == 0 && ranges == 0;
+    }
+
+    private void calculateTotals()
+    {
+        files = 0;
+        bytes = 0;
+        ranges = 0;
+        summaries.values().forEach(Table::calculateTotals);
+        for (Table table: summaries.values())
+        {
+            table.calculateTotals();
+            files += table.files;
+            bytes += table.bytes;
+            ranges += table.ranges;
+        }
+        totalsCalculated = true;
+    }
+
+    public String toString()
+    {
+        List<Pair<String, String>> tables = 
Lists.newArrayList(summaries.keySet());
+        tables.sort((o1, o2) ->
+            {
+                int ks = o1.left.compareTo(o2.left);
+                return ks != 0 ? ks : o1.right.compareTo(o2.right);
+            });
+
+        calculateTotals();
+
+        StringBuilder output = new StringBuilder();
+
+        if (isEstimate)
+        {
+            output.append(String.format("Total estimated streaming: %s ranges, 
%s sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes)));
+        }
+        else
+        {
+            output.append(String.format("Total streaming: %s ranges, %s 
sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes)));
+        }
+
+        for (Pair<String, String> tableName: tables)
+        {
+            Table table = summaries.get(tableName);
+            output.append(table.toString()).append('\n');
+        }
+
+        return output.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java 
b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 9903114..4d59942 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 
@@ -44,8 +45,9 @@ public class PrepareMessage extends RepairMessage
     public final boolean isIncremental;
     public final long timestamp;
     public final boolean isGlobal;
+    public final PreviewKind previewKind;
 
-    public PrepareMessage(UUID parentRepairSession, List<TableId> tableIds, 
Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean 
isGlobal)
+    public PrepareMessage(UUID parentRepairSession, List<TableId> tableIds, 
Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean 
isGlobal, PreviewKind previewKind)
     {
         super(Type.PREPARE_MESSAGE, null);
         this.parentRepairSession = parentRepairSession;
@@ -54,6 +56,7 @@ public class PrepareMessage extends RepairMessage
         this.isIncremental = isIncremental;
         this.timestamp = timestamp;
         this.isGlobal = isGlobal;
+        this.previewKind = previewKind;
     }
 
     @Override
@@ -66,6 +69,7 @@ public class PrepareMessage extends RepairMessage
                parentRepairSession.equals(other.parentRepairSession) &&
                isIncremental == other.isIncremental &&
                isGlobal == other.isGlobal &&
+               previewKind == other.previewKind &&
                timestamp == other.timestamp &&
                tableIds.equals(other.tableIds) &&
                ranges.equals(other.ranges);
@@ -74,7 +78,7 @@ public class PrepareMessage extends RepairMessage
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, parentRepairSession, isGlobal, 
isIncremental, timestamp, tableIds, ranges);
+        return Objects.hash(messageType, parentRepairSession, isGlobal, 
previewKind, isIncremental, timestamp, tableIds, ranges);
     }
 
     public static class PrepareMessageSerializer implements 
MessageSerializer<PrepareMessage>
@@ -94,6 +98,7 @@ public class PrepareMessage extends RepairMessage
             out.writeBoolean(message.isIncremental);
             out.writeLong(message.timestamp);
             out.writeBoolean(message.isGlobal);
+            out.writeInt(message.previewKind.getSerializationVal());
         }
 
         public PrepareMessage deserialize(DataInputPlus in, int version) 
throws IOException
@@ -110,7 +115,8 @@ public class PrepareMessage extends RepairMessage
             boolean isIncremental = in.readBoolean();
             long timestamp = in.readLong();
             boolean isGlobal = in.readBoolean();
-            return new PrepareMessage(parentRepairSession, tableIds, ranges, 
isIncremental, timestamp, isGlobal);
+            PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
+            return new PrepareMessage(parentRepairSession, tableIds, ranges, 
isIncremental, timestamp, isGlobal, previewKind);
         }
 
         public long serializedSize(PrepareMessage message, int version)
@@ -126,6 +132,7 @@ public class PrepareMessage extends RepairMessage
             size += TypeSizes.sizeof(message.isIncremental);
             size += TypeSizes.sizeof(message.timestamp);
             size += TypeSizes.sizeof(message.isGlobal);
+            size += 
TypeSizes.sizeof(message.previewKind.getSerializationVal());
             return size;
         }
     }

Reply via email to