This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e87a1e0  Expose streaming as a vtable
e87a1e0 is described below

commit e87a1e0c0a19c64ed2edc2d340c0f8af16776e2c
Author: David Capwell <[email protected]>
AuthorDate: Tue Mar 1 13:15:18 2022 -0800

    Expose streaming as a vtable
    
    patch by David Capwell; reviewed by Dinesh Joshi, Paulo Motta for 
CASSANDRA-17390
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  28 ++
 .../org/apache/cassandra/config/DurationSpec.java  |  24 ++
 .../cassandra/db/virtual/AbstractVirtualTable.java |   6 +
 .../apache/cassandra/db/virtual/SimpleDataSet.java |   6 +-
 .../db/virtual/StreamingVirtualTable.java          | 109 ++++++
 .../cassandra/db/virtual/SystemViewsKeyspace.java  |   1 +
 .../apache/cassandra/service/CassandraDaemon.java  |   2 +
 .../apache/cassandra/streaming/SessionInfo.java    |  11 +-
 .../apache/cassandra/streaming/StreamManager.java  | 145 ++++++-
 .../cassandra/streaming/StreamResultFuture.java    |  13 +-
 .../apache/cassandra/streaming/StreamingState.java | 432 +++++++++++++++++++++
 .../management/StreamEventJMXNotifier.java         |   2 +-
 .../tools/nodetool/formatter/TableBuilder.java     |  20 +
 .../cassandra/distributed/impl/Instance.java       |   3 +
 .../test/streaming/RebuildStreamingTest.java       |  96 +++++
 .../distributed/util/QueryResultUtil.java          |  70 +++-
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  17 +-
 .../unit/org/apache/cassandra/db/MmapFileTest.java |  10 +
 .../cassandra/db/filter/ColumnFilterTest.java      |   2 +
 .../db/virtual/StreamingVirtualTableTest.java      | 219 +++++++++++
 .../org/apache/cassandra/io/util/FileTest.java     |   4 +
 .../AbstractFilesystemOwnershipCheckTest.java      |   9 +
 .../apache/cassandra/utils/binlog/BinLogTest.java  |   9 +
 25 files changed, 1214 insertions(+), 27 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5580102..97ce6c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Expose streaming as a vtable (CASSANDRA-17390)
  * Make startup checks configurable (CASSANDRA-17220)
  * Add guardrail for number of partition keys on IN queries (CASSANDRA-17186)
  * update Python test framework from nose to pytest (CASSANDRA-17293)
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 15226cf..964a5e1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -768,6 +768,8 @@ public class Config
     public volatile Set<String> table_properties_disallowed = 
Collections.emptySet();
     public volatile boolean user_timestamps_enabled = true;
     public volatile boolean read_before_write_list_operations_enabled = true;
+    public volatile DurationSpec streaming_state_expires = 
DurationSpec.inDays(3);
+    public volatile DataStorageSpec streaming_state_size = 
DataStorageSpec.inMebibytes(40);
 
     /** The configuration of startup checks. */
     public volatile Map<StartupCheckType, Map<String, Object>> startup_checks 
= new HashMap<>();
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9f88ce0..65cb3d6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3885,4 +3885,32 @@ public class DatabaseDescriptor
             conf.force_new_prepared_statement_behaviour = value;
         }
     }
+
+    public static DurationSpec getStreamingStateExpires()
+    {
+        return conf.streaming_state_expires;
+    }
+
+    public static void setStreamingStateExpires(DurationSpec duration)
+    {
+        if 
(!conf.streaming_state_expires.equals(Objects.requireNonNull(duration, 
"duration")))
+        {
+            logger.info("Setting streaming_state_expires to {}", duration);
+            conf.streaming_state_expires = duration;
+        }
+    }
+
+    public static DataStorageSpec getStreamingStateSize()
+    {
+        return conf.streaming_state_size;
+    }
+
+    public static void setStreamingStateSize(DataStorageSpec duration)
+    {
+        if (!conf.streaming_state_size.equals(Objects.requireNonNull(duration, 
"duration")))
+        {
+            logger.info("Setting streaming_state_size to {}", duration);
+            conf.streaming_state_size = duration;
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/config/DurationSpec.java 
b/src/java/org/apache/cassandra/config/DurationSpec.java
index f06c247..e5edb35 100644
--- a/src/java/org/apache/cassandra/config/DurationSpec.java
+++ b/src/java/org/apache/cassandra/config/DurationSpec.java
@@ -129,6 +129,19 @@ public class DurationSpec
         }
     }
 
+    // get vs no-get prefix is not consistent in the code base, but for 
classes involved with config parsing, it is
+    // imporant to be explicit about get/set as this changes how parsing is 
done; this class is a data-type, so is
+    // not nested, having get/set can confuse parsing thinking this is a 
nested type
+    public long quantity()
+    {
+        return quantity;
+    }
+
+    public TimeUnit unit()
+    {
+        return unit;
+    }
+
     /**
      * Creates a {@code DurationSpec} of the specified amount of milliseconds.
      *
@@ -179,6 +192,17 @@ public class DurationSpec
     }
 
     /**
+     * Creates a {@code DurationSpec} of the specified amount of days.
+     *
+     * @param days the amount of days
+     * @return a duration
+     */
+    public static DurationSpec inDays(long days)
+    {
+        return new DurationSpec(days, DAYS);
+    }
+
+    /**
      * Creates a {@code DurationSpec} of the specified amount of seconds. 
Custom method for special cases.
      *
      * @param value which can be in the old form only presenting the quantity 
or the post CASSANDRA-15234 form - a
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java 
b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
index b20ac43..cf90c42 100644
--- a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -135,6 +135,12 @@ public abstract class AbstractVirtualTable implements 
VirtualTable
         throw new InvalidRequestException("Truncation is not supported by 
table " + metadata);
     }
 
+    @Override
+    public String toString()
+    {
+        return metadata().toString();
+    }
+
     public interface DataSet
     {
         boolean isEmpty();
diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java 
b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
index b8cb9f5..4150783 100644
--- a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
+++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
@@ -176,8 +176,10 @@ public class SimpleDataSet extends 
AbstractVirtualTable.AbstractDataSet
         private void add(String columnName, Object value)
         {
             ColumnMetadata column = 
metadata.getColumn(ByteBufferUtil.bytes(columnName));
-            if (null == column || !column.isRegular())
-                throw new IllegalArgumentException();
+            if (column == null)
+                throw new IllegalArgumentException("Unknown column: " + 
columnName);
+            if (!column.isRegular())
+                throw new IllegalArgumentException(String.format("Expect a 
regular column %s, but got %s", columnName, column.kind));
             values.put(column, value);
         }
 
diff --git 
a/src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java 
b/src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java
new file mode 100644
index 0000000..1036f18
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamingState;
+
+import static 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+
+public class StreamingVirtualTable extends AbstractVirtualTable
+{
+    public StreamingVirtualTable(String keyspace)
+    {
+        super(parse("CREATE TABLE streaming (" +
+                    "  id uuid,\n" +
+                    "  follower boolean,\n" +
+                    "  operation text, \n" +
+                    "  peers frozen<list<text>>,\n" +
+                    "  status text,\n" +
+                    "  progress_percentage float,\n" +
+                    "  last_updated_at timestamp,\n" +
+                    "  duration_millis bigint,\n" +
+                    "  failure_cause text,\n" +
+                    "  success_message text,\n" +
+                    "\n" +
+                    StreamingState.Sessions.columns() +
+                    "\n" +
+                    stateColumns() +
+                    "\n" +
+                    "PRIMARY KEY ((id))" +
+                    ")", keyspace)
+              .kind(TableMetadata.Kind.VIRTUAL)
+              .build());
+    }
+
+    private static String stateColumns()
+    {
+        StringBuilder sb = new StringBuilder();
+        for (StreamingState.Status state : StreamingState.Status.values())
+            sb.append("  
status_").append(state.name().toLowerCase()).append("_timestamp timestamp,\n");
+        return sb.toString();
+    }
+
+    @Override
+    public DataSet data()
+    {
+        SimpleDataSet result = new SimpleDataSet(metadata());
+        StreamManager.instance.getStreamingStates()
+                              .forEach(s -> updateDataSet(result, s));
+        return result;
+    }
+
+    @Override
+    public DataSet data(DecoratedKey partitionKey)
+    {
+        UUID id = UUIDType.instance.compose(partitionKey.getKey());
+        SimpleDataSet result = new SimpleDataSet(metadata());
+        StreamingState state = StreamManager.instance.getStreamingState(id);
+        if (state != null)
+            updateDataSet(result, state);
+        return result;
+    }
+
+    private void updateDataSet(SimpleDataSet ds, StreamingState state)
+    {
+        ds.row(state.id());
+        ds.column("last_updated_at", new Date(state.lastUpdatedAtMillis())); 
// read early to see latest state
+        ds.column("follower", state.follower());
+        ds.column("operation", state.operation().getDescription());
+        ds.column("peers", 
state.peers().stream().map(Object::toString).collect(Collectors.toList()));
+        ds.column("status", state.status().name().toLowerCase());
+        ds.column("progress_percentage", round(state.progress() * 100));
+        ds.column("duration_millis", state.durationMillis());
+        ds.column("failure_cause", state.failureCause());
+        ds.column("success_message", state.successMessage());
+        for (Map.Entry<StreamingState.Status, Long> e : 
state.stateTimesMillis().entrySet())
+            ds.column("status_" + e.getKey().name().toLowerCase() + 
"_timestamp", new Date(e.getValue()));
+
+        state.sessions().update(ds);
+    }
+
+    static float round(float value)
+    {
+        return Math.round(value * 100) / 100;
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 6fe189e..fc0f40a 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -45,6 +45,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
                     .add(new RolesCacheKeysTable(VIRTUAL_VIEWS))
                     .add(new CQLMetricsTable(VIRTUAL_VIEWS))
                     .add(new BatchMetricsTable(VIRTUAL_VIEWS))
+                    .add(new StreamingVirtualTable(VIRTUAL_VIEWS))
                     .build());
     }
 }
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 82eadbc..67195d7 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -77,6 +77,7 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.security.ThreadAwareSecurityManager;
+import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JMXServerUtils;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -370,6 +371,7 @@ public class CassandraDaemon
             
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance,
 30, sizeRecorderInterval, TimeUnit.SECONDS);
 
         ActiveRepairService.instance.start();
+        StreamManager.instance.start();
 
         // Prepared statements
         QueryProcessor.instance.preloadPreparedStatements();
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java 
b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index 31cff9b..55f1e44 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -44,8 +44,8 @@ public final class SessionInfo implements Serializable
     /** Current session state */
     public final StreamSession.State state;
 
-    private final Map<String, ProgressInfo> receivingFiles;
-    private final Map<String, ProgressInfo> sendingFiles;
+    private final Map<String, ProgressInfo> receivingFiles = new 
ConcurrentHashMap<>();
+    private final Map<String, ProgressInfo> sendingFiles = new 
ConcurrentHashMap<>();
 
     public SessionInfo(InetSocketAddress peer,
                        int sessionIndex,
@@ -59,11 +59,14 @@ public final class SessionInfo implements Serializable
         this.connecting = connecting;
         this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
         this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
-        this.receivingFiles = new ConcurrentHashMap<>();
-        this.sendingFiles = new ConcurrentHashMap<>();
         this.state = state;
     }
 
+    public SessionInfo(SessionInfo other)
+    {
+        this(other.peer, other.sessionIndex, other.connecting, 
other.receivingSummaries, other.sendingSummaries, other.state);
+    }
+
     public boolean isFailed()
     {
         return state == StreamSession.State.FAILED;
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java 
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index bec112b..86b42ce 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -17,9 +17,11 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
 import javax.management.ListenerNotFoundException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.NotificationFilter;
@@ -28,12 +30,18 @@ import javax.management.openmbean.CompositeData;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
 import org.apache.cassandra.streaming.management.StreamStateCompositeData;
@@ -45,6 +53,8 @@ import 
org.apache.cassandra.streaming.management.StreamStateCompositeData;
  */
 public class StreamManager implements StreamManagerMBean
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamManager.class);
+
     public static final StreamManager instance = new StreamManager();
 
     /**
@@ -204,6 +214,7 @@ public class StreamManager implements StreamManagerMBean
     }
 
     private final StreamEventJMXNotifier notifier = new 
StreamEventJMXNotifier();
+    private final CopyOnWriteArrayList<StreamListener> listeners = new 
CopyOnWriteArrayList<>();
 
     /*
      * Currently running streams. Removed after completion/failure.
@@ -213,6 +224,83 @@ public class StreamManager implements StreamManagerMBean
     private final Map<UUID, StreamResultFuture> initiatorStreams = new 
NonBlockingHashMap<>();
     private final Map<UUID, StreamResultFuture> followerStreams = new 
NonBlockingHashMap<>();
 
+    private final Cache<UUID, StreamingState> states;
+    private final StreamListener listener = new StreamListener()
+    {
+        @Override
+        public void onRegister(StreamResultFuture result)
+        {
+            // reason for synchronized rather than states.get is to detect 
duplicates
+            // streaming shouldn't be producing duplicates as that would imply 
a planId collision
+            synchronized (states)
+            {
+                StreamingState previous = states.getIfPresent(result.planId);
+                if (previous == null)
+                {
+                    StreamingState state = new StreamingState(result);
+                    states.put(state.id(), state);
+                    state.phase.start();
+                    result.addEventListener(state);
+                }
+                else
+                {
+                    logger.warn("Duplicate streaming states detected for id 
{}", result.planId);
+                }
+            }
+        }
+    };
+
+    public StreamManager()
+    {
+        DurationSpec duration = DatabaseDescriptor.getStreamingStateExpires();
+        long sizeBytes = DatabaseDescriptor.getStreamingStateSize().toBytes();
+        long numElements = sizeBytes / StreamingState.ELEMENT_SIZE;
+        logger.info("Storing streaming state for {} or for {} elements", 
duration, numElements);
+        states = CacheBuilder.newBuilder()
+                             .expireAfterWrite(duration.quantity(), 
duration.unit())
+                             .maximumSize(numElements)
+                             .build();
+    }
+
+    public void start()
+    {
+        addListener(listener);
+    }
+
+    public void stop()
+    {
+        removeListener(listener);
+    }
+
+    public Collection<StreamingState> getStreamingStates()
+    {
+        return states.asMap().values();
+    }
+
+    public StreamingState getStreamingState(UUID id)
+    {
+        return states.getIfPresent(id);
+    }
+
+    @VisibleForTesting
+    public void putStreamingState(StreamingState state)
+    {
+        synchronized (states)
+        {
+            StreamingState previous = states.getIfPresent(state.id());
+            if (previous != null)
+                throw new AssertionError("StreamPlan id " + state.id() + " 
already exists");
+            states.put(state.id(), state);
+        }
+    }
+
+    @VisibleForTesting
+    public void clearStates()
+    {
+        // states.cleanUp() doesn't clear, it looks to only run gc on things 
that could be removed... this method should remove all state
+        states.asMap().clear();
+    }
+
     public Set<CompositeData> getCurrentStreams()
     {
         return 
Sets.newHashSet(Iterables.transform(Iterables.concat(initiatorStreams.values(), 
followerStreams.values()), new Function<StreamResultFuture, CompositeData>()
@@ -231,6 +319,7 @@ public class StreamManager implements StreamManagerMBean
         result.addListener(() -> initiatorStreams.remove(result.planId));
 
         initiatorStreams.put(result.planId, result);
+        notifySafeOnRegister(result);
     }
 
     public StreamResultFuture registerFollower(final StreamResultFuture result)
@@ -240,7 +329,51 @@ public class StreamManager implements StreamManagerMBean
         result.addListener(() -> followerStreams.remove(result.planId));
 
         StreamResultFuture previous = 
followerStreams.putIfAbsent(result.planId, result);
-        return previous == null ? result : previous;
+        if (previous == null)
+        {
+            notifySafeOnRegister(result);
+            return result;
+        }
+        return previous;
+    }
+
+    @VisibleForTesting
+    public void putInitiatorStream(StreamResultFuture future)
+    {
+        StreamResultFuture current = 
initiatorStreams.putIfAbsent(future.planId, future);
+        assert current == null: "Duplicat initiator stream for " + 
future.planId;
+    }
+
+    @VisibleForTesting
+    public void putFollowerStream(StreamResultFuture future)
+    {
+        StreamResultFuture current = 
followerStreams.putIfAbsent(future.planId, future);
+        assert current == null: "Duplicate follower stream for " + 
future.planId;
+    }
+
+    public void addListener(StreamListener listener)
+    {
+        listeners.add(listener);
+    }
+
+    public void removeListener(StreamListener listener)
+    {
+        listeners.remove(listener);
+    }
+
+    private void notifySafeOnRegister(StreamResultFuture result)
+    {
+        for (StreamListener l : listeners)
+        {
+            try
+            {
+                l.onRegister(result);
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Failed to notify stream listener of new 
Initiator/Follower", t);
+            }
+        }
     }
 
     public StreamResultFuture getReceivingStream(UUID planId)
@@ -248,6 +381,11 @@ public class StreamManager implements StreamManagerMBean
         return followerStreams.get(planId);
     }
 
+    public StreamResultFuture getInitiatorStream(UUID planId)
+    {
+        return initiatorStreams.get(planId);
+    }
+
     public void addNotificationListener(NotificationListener listener, 
NotificationFilter filter, Object handback)
     {
         notifier.addNotificationListener(listener, filter, handback);
@@ -282,4 +420,9 @@ public class StreamManager implements StreamManagerMBean
 
         return streamResultFuture.getSession(peer, sessionIndex);
     }
+
+    public interface StreamListener
+    {
+        default void onRegister(StreamResultFuture result) {}
+    }
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 66e99be..8f11587 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -62,7 +62,7 @@ public final class StreamResultFuture extends 
AsyncFuture<StreamState>
      * @param planId Stream plan ID
      * @param streamOperation Stream streamOperation
      */
-    private StreamResultFuture(UUID planId, StreamOperation streamOperation, 
StreamCoordinator coordinator)
+    public StreamResultFuture(UUID planId, StreamOperation streamOperation, 
StreamCoordinator coordinator)
     {
         this.planId = planId;
         this.streamOperation = streamOperation;
@@ -208,7 +208,16 @@ public final class StreamResultFuture extends 
AsyncFuture<StreamState>
     {
         // delegate to listener
         for (StreamEventHandler listener : eventListeners)
-            listener.handleStreamEvent(event);
+        {
+            try
+            {
+                listener.handleStreamEvent(event);
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Unexpected exception in listern while calling 
handleStreamEvent", t);
+            }
+        }
     }
 
     private synchronized void maybeComplete()
diff --git a/src/java/org/apache/cassandra/streaming/StreamingState.java 
b/src/java/org/apache/cassandra/streaming/StreamingState.java
new file mode 100644
index 0000000..b7ef61e
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamingState.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.virtual.SimpleDataSet;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class StreamingState implements StreamEventHandler
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamingState.class);
+
+    public static final long ELEMENT_SIZE = ObjectSizes.measureDeep(new 
StreamingState(UUID.randomUUID(), StreamOperation.OTHER, false));
+
+    public enum Status
+    {INIT, START, SUCCESS, FAILURE}
+
+    private final long createdAtMillis = Clock.Global.currentTimeMillis();
+
+    // while streaming is running, this is a cache of StreamInfo seen with 
progress state
+    // the reason for the cache is that StreamSession drops data after tasks 
(recieve/send) complete, this makes
+    // it so that current state of a future tracks work pending rather than 
work done, cache solves this by not deleting
+    // when tasks complete
+    // To lower memory costs, clear this after the stream completes
+    private ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = new 
ConcurrentHashMap<>();
+
+    private final UUID id;
+    private final boolean follower;
+    private final StreamOperation operation;
+    private Set<InetSocketAddress> peers = null;
+    private Sessions sessions = Sessions.EMPTY;
+
+    private Status status;
+    private String completeMessage = null;
+
+    private final long[] stateTimesNanos;
+    private volatile long lastUpdatedAtNanos;
+
+    // API for state changes
+    public final Phase phase = new Phase();
+
+    public StreamingState(StreamResultFuture result)
+    {
+        this(result.planId, result.streamOperation, 
result.getCoordinator().isFollower());
+    }
+
+    private StreamingState(UUID planId, StreamOperation streamOperation, 
boolean follower)
+    {
+        this.id = planId;
+        this.operation = streamOperation;
+        this.follower = follower;
+        this.stateTimesNanos = new long[Status.values().length];
+        updateState(Status.INIT);
+    }
+
+    public UUID id()
+    {
+        return id;
+    }
+
+    public boolean follower()
+    {
+        return follower;
+    }
+
+    public StreamOperation operation()
+    {
+        return operation;
+    }
+
+    public Set<InetSocketAddress> peers()
+    {
+        Set<InetSocketAddress> peers = this.peers;
+        if (peers != null)
+            return peers;
+        ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = 
this.streamProgress;
+        if (streamProgress != null)
+            return streamProgress.keySet();
+        return Collections.emptySet();
+    }
+
+    public Status status()
+    {
+        return status;
+    }
+
+    public Sessions sessions()
+    {
+        return sessions;
+    }
+
+    public boolean isComplete()
+    {
+        switch (status)
+        {
+            case SUCCESS:
+            case FAILURE:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    public StreamResultFuture future()
+    {
+        if (follower)
+            return StreamManager.instance.getReceivingStream(id);
+        else
+            return StreamManager.instance.getInitiatorStream(id);
+    }
+
+    public float progress()
+    {
+        switch (status)
+        {
+            case INIT:
+                return 0;
+            case START:
+                return Math.min(0.99f, sessions().progress().floatValue());
+            case SUCCESS:
+            case FAILURE:
+                return 1;
+            default:
+                throw new AssertionError("unknown state: " + status);
+        }
+    }
+
+    public EnumMap<Status, Long> stateTimesMillis()
+    {
+        EnumMap<Status, Long> map = new EnumMap<>(Status.class);
+        for (int i = 0; i < stateTimesNanos.length; i++)
+        {
+            long nanos = stateTimesNanos[i];
+            if (nanos != 0)
+                map.put(Status.values()[i], nanosToMillis(nanos));
+        }
+        return map;
+    }
+
+    public long durationMillis()
+    {
+        long endNanos = lastUpdatedAtNanos;
+        if (!isComplete())
+            endNanos = Clock.Global.nanoTime();
+        return TimeUnit.NANOSECONDS.toMillis(endNanos - stateTimesNanos[0]);
+    }
+
+    public long lastUpdatedAtMillis()
+    {
+        return nanosToMillis(lastUpdatedAtNanos);
+    }
+
+    public long lastUpdatedAtNanos()
+    {
+        return lastUpdatedAtNanos;
+    }
+
+    public String failureCause()
+    {
+        if (status == Status.FAILURE)
+            return completeMessage;
+        return null;
+    }
+
+    public String successMessage()
+    {
+        if (status == Status.SUCCESS)
+            return completeMessage;
+        return null;
+    }
+
+    @Override
+    public String toString()
+    {
+        TableBuilder table = new TableBuilder();
+        table.add("id", id.toString());
+        table.add("status", status().name().toLowerCase());
+        table.add("progress", (progress() * 100) + "%");
+        table.add("duration_ms", Long.toString(durationMillis()));
+        table.add("last_updated_ms", Long.toString(lastUpdatedAtMillis()));
+        table.add("failure_cause", failureCause());
+        table.add("success_message", successMessage());
+        for (Map.Entry<Status, Long> e : stateTimesMillis().entrySet())
+            table.add("status_" + e.getKey().name().toLowerCase() + "_ms", 
e.toString());
+        return table.toString();
+    }
+
+    @Override
+    public synchronized void handleStreamEvent(StreamEvent event)
+    {
+        ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = 
this.streamProgress;
+        if (streamProgress == null)
+        {
+            logger.warn("Got stream event {} after the stream completed", 
event.eventType);
+            return;
+        }
+        try
+        {
+            switch (event.eventType)
+            {
+                case STREAM_PREPARED:
+                    streamPrepared((StreamEvent.SessionPreparedEvent) event);
+                    break;
+                case STREAM_COMPLETE:
+                    // currently not taking track of state, so ignore
+                    break;
+                case FILE_PROGRESS:
+                    streamProgress((StreamEvent.ProgressEvent) event);
+                    break;
+                default:
+                    logger.warn("Unknown stream event type: {}", 
event.eventType);
+            }
+        }
+        catch (Throwable t)
+        {
+            logger.warn("Unexpected exception handling stream event", t);
+        }
+        sessions = Sessions.create(streamProgress.values());
+        lastUpdatedAtNanos = Clock.Global.nanoTime();
+    }
+
+    private void streamPrepared(StreamEvent.SessionPreparedEvent event)
+    {
+        SessionInfo session = new SessionInfo(event.session);
+        streamProgress.putIfAbsent(session.peer, session);
+    }
+
+    private void streamProgress(StreamEvent.ProgressEvent event)
+    {
+        SessionInfo info = streamProgress.get(event.progress.peer);
+        if (info != null)
+        {
+            info.updateProgress(event.progress);
+        }
+        else
+        {
+            logger.warn("[Stream #{}} ID#{}] Recieved stream progress before 
prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer);
+        }
+    }
+
+    @Override
+    public synchronized void onSuccess(@Nullable StreamState state)
+    {
+        ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = 
this.streamProgress;
+        if (streamProgress != null)
+        {
+            sessions = Sessions.create(streamProgress.values());
+            peers = new HashSet<>(streamProgress.keySet());
+            this.streamProgress = null;
+            updateState(Status.SUCCESS);
+        }
+    }
+
+    @Override
+    public synchronized void onFailure(Throwable throwable)
+    {
+        ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = 
this.streamProgress;
+        if (streamProgress != null)
+        {
+            sessions = Sessions.create(streamProgress.values());
+            peers = new HashSet<>(streamProgress.keySet());
+            this.streamProgress = null;
+        }
+        completeMessage = Throwables.getStackTraceAsString(throwable);
+        updateState(Status.FAILURE);
+    }
+
+    private synchronized void updateState(Status state)
+    {
+        this.status = state;
+        long now = Clock.Global.nanoTime();
+        stateTimesNanos[state.ordinal()] = now;
+        lastUpdatedAtNanos = now;
+    }
+
+    private long nanosToMillis(long nanos)
+    {
+        // nanos - creationTimeNanos = delta since init
+        return createdAtMillis + TimeUnit.NANOSECONDS.toMillis(nanos - 
stateTimesNanos[0]);
+    }
+
+    public class Phase
+    {
+        public void start()
+        {
+            updateState(Status.START);
+        }
+    }
+
+    public static class Sessions
+    {
+        public static final Sessions EMPTY = new Sessions(0, 0, 0, 0, 0, 0, 0, 
0);
+
+        public final long bytesToReceive, bytesReceived;
+        public final long bytesToSend, bytesSent;
+        public final long filesToReceive, filesReceived;
+        public final long filesToSend, filesSent;
+
+        public Sessions(long bytesToReceive, long bytesReceived, long 
bytesToSend, long bytesSent, long filesToReceive, long filesReceived, long 
filesToSend, long filesSent)
+        {
+            this.bytesToReceive = bytesToReceive;
+            this.bytesReceived = bytesReceived;
+            this.bytesToSend = bytesToSend;
+            this.bytesSent = bytesSent;
+            this.filesToReceive = filesToReceive;
+            this.filesReceived = filesReceived;
+            this.filesToSend = filesToSend;
+            this.filesSent = filesSent;
+        }
+
+        public static String columns()
+        {
+            return "  bytes_to_receive bigint, \n" +
+                   "  bytes_received bigint, \n" +
+                   "  bytes_to_send bigint, \n" +
+                   "  bytes_sent bigint, \n" +
+                   "  files_to_receive bigint, \n" +
+                   "  files_received bigint, \n" +
+                   "  files_to_send bigint, \n" +
+                   "  files_sent bigint, \n";
+        }
+
+        public static Sessions create(Collection<SessionInfo> sessions)
+        {
+            long bytesToReceive = 0;
+            long bytesReceived = 0;
+            long filesToReceive = 0;
+            long filesReceived = 0;
+            long bytesToSend = 0;
+            long bytesSent = 0;
+            long filesToSend = 0;
+            long filesSent = 0;
+            for (SessionInfo session : sessions)
+            {
+                bytesToReceive += session.getTotalSizeToReceive();
+                bytesReceived += session.getTotalSizeReceived();
+
+                filesToReceive += session.getTotalFilesToReceive();
+                filesReceived += session.getTotalFilesReceived();
+
+                bytesToSend += session.getTotalSizeToSend();
+                bytesSent += session.getTotalSizeSent();
+
+                filesToSend += session.getTotalFilesToSend();
+                filesSent += session.getTotalFilesSent();
+            }
+            if (0 == bytesToReceive && 0 == bytesReceived && 0 == 
filesToReceive && 0 == filesReceived && 0 == bytesToSend && 0 == bytesSent && 0 
== filesToSend && 0 == filesSent)
+                return EMPTY;
+            return new Sessions(bytesToReceive, bytesReceived,
+                                bytesToSend, bytesSent,
+                                filesToReceive, filesReceived,
+                                filesToSend, filesSent);
+        }
+
+        public boolean isEmpty()
+        {
+            return this == EMPTY;
+        }
+
+        public BigDecimal receivedBytesPercent()
+        {
+            return div(bytesReceived, bytesToReceive);
+        }
+
+        public BigDecimal sentBytesPercent()
+        {
+            return div(bytesSent, bytesToSend);
+        }
+
+        public BigDecimal progress()
+        {
+            return div(bytesSent + bytesReceived, bytesToSend + 
bytesToReceive);
+        }
+
+        private static BigDecimal div(long a, long b)
+        {
+            // not "correct" but its what you would do if this happened...
+            if (b == 0)
+                return BigDecimal.ZERO;
+            return BigDecimal.valueOf(a).divide(BigDecimal.valueOf(b), 4, 
RoundingMode.HALF_UP);
+        }
+
+        public void update(SimpleDataSet ds)
+        {
+            if (isEmpty())
+                return;
+            ds.column("bytes_to_receive", bytesToReceive)
+              .column("bytes_received", bytesReceived)
+              .column("bytes_to_send", bytesToSend)
+              .column("bytes_sent", bytesSent)
+              .column("files_to_receive", filesToReceive)
+              .column("files_received", filesReceived)
+              .column("files_to_send", filesToSend)
+              .column("files_sent", filesSent);
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
 
b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
index e504dae..b8c7487 100644
--- 
a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
+++ 
b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
@@ -87,7 +87,7 @@ public class StreamEventJMXNotifier extends 
NotificationBroadcasterSupport imple
         Notification notif = new 
Notification(StreamEvent.class.getCanonicalName() + ".failure",
                                               StreamManagerMBean.OBJECT_NAME,
                                               seq.getAndIncrement());
-        notif.setUserData(t.fillInStackTrace().toString());
+        notif.setUserData(t.toString());
         sendNotification(notif);
     }
 }
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java 
b/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java
index 166ed3d..025716b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java
@@ -18,7 +18,11 @@
 
 package org.apache.cassandra.tools.nodetool.formatter;
 
+import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.io.UncheckedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -116,6 +120,22 @@ public class TableBuilder
         }
     }
 
+    @Override
+    public String toString()
+    {
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        try (PrintStream stream = new PrintStream(os, true, 
StandardCharsets.UTF_8.displayName()))
+        {
+            printTo(stream);
+            stream.flush();
+            return os.toString(StandardCharsets.UTF_8.displayName());
+        }
+        catch (UnsupportedEncodingException e)
+        {
+            throw new UncheckedIOException(e);
+        }
+    }
+
     /**
      * Share max offsets across multiple TableBuilders
      */
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index e5a933d..38cfef0 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -120,6 +120,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
 import org.apache.cassandra.service.snapshot.SnapshotManager;
+import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamReceiveTask;
 import org.apache.cassandra.streaming.StreamTransferTask;
 import org.apache.cassandra.streaming.async.NettyStreamingChannel;
@@ -667,6 +668,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                     throw new IllegalStateException(String.format("%s != %s", 
FBUtilities.getBroadcastAddressAndPort(), broadcastAddress()));
 
                 ActiveRepairService.instance.start();
+                StreamManager.instance.start();
                 CassandraDaemon.getInstanceForTesting().completeSetup();
             }
             catch (Throwable t)
@@ -740,6 +742,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                                 NettyStreamingChannel::shutdown,
                                 () -> StreamReceiveTask.shutdownAndWait(1L, 
MINUTES),
                                 () -> StreamTransferTask.shutdownAndWait(1L, 
MINUTES),
+                                () -> StreamManager.instance.stop(),
                                 () -> 
SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
                                 () -> 
IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
                                 () -> 
ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
new file mode 100644
index 0000000..db89669
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RebuildStreamingTest extends TestBaseImpl
+{
+    @Test
+    public void test() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> 
c.with(Feature.values()).set("stream_entire_sstables", false))
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id 
varchar PRIMARY KEY);"));
+            cluster.stream().forEach(i -> 
i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success());
+            IInvokableInstance first = cluster.get(1);
+            IInvokableInstance second = cluster.get(2);
+            long expectedFiles = 10;
+            for (int i = 0; i < expectedFiles; i++)
+            {
+                first.executeInternal(withKeyspace("insert into 
%s.users(user_id) values (?)"), "dcapwell" + i);
+                first.flush(KEYSPACE);
+            }
+
+            second.nodetoolResult("rebuild", "--keyspace", 
KEYSPACE).asserts().success();
+
+            SimpleQueryResult qr = first.executeInternalWithResult("SELECT * 
FROM system_views.streaming");
+            String txt = QueryResultUtil.expand(qr);
+            qr.reset();
+            assertThat(qr.toObjectArrays().length).describedAs("Found 
rows\n%s", txt).isEqualTo(1);
+            assertThat(qr.hasNext()).isTrue();
+            Row row = qr.next();
+            QueryResultUtil.assertThat(row)
+                           .isEqualTo("peers", 
Collections.singletonList("/127.0.0.2:7012"))
+                           .isEqualTo("follower", true)
+                           .isEqualTo("operation", "Rebuild")
+                           .isEqualTo("status", "success")
+                           .isEqualTo("progress_percentage", 100.0F)
+                           .isEqualTo("success_message", 
null).isEqualTo("failure_cause", null)
+                           .isEqualTo("files_sent", expectedFiles)
+                           .columnsEqualTo("files_sent", "files_to_send")
+                           .columnsEqualTo("bytes_sent", "bytes_to_send")
+                           .isEqualTo("files_received", 0L)
+                           .columnsEqualTo("files_received", 
"files_to_receive", "bytes_received", "bytes_to_receive");
+            long totalBytes = row.getLong("bytes_sent");
+            assertThat(totalBytes).isGreaterThan(0);
+
+            qr = second.executeInternalWithResult("SELECT * FROM 
system_views.streaming");
+            txt = QueryResultUtil.expand(qr);
+            qr.reset();
+            assertThat(qr.toObjectArrays().length).describedAs("Found 
rows\n%s", txt).isEqualTo(1);
+            assertThat(qr.hasNext()).isTrue();
+
+            QueryResultUtil.assertThat(qr.next())
+                           .isEqualTo("peers", 
Collections.singletonList("/127.0.0.1:7012"))
+                           .isEqualTo("follower", false)
+                           .isEqualTo("operation", "Rebuild")
+                           .isEqualTo("status", "success")
+                           .isEqualTo("progress_percentage", 100.0F)
+                           .isEqualTo("success_message", 
null).isEqualTo("failure_cause", null)
+                           .columnsEqualTo("files_to_receive", 
"files_received").isEqualTo("files_received", expectedFiles)
+                           .columnsEqualTo("bytes_to_receive", 
"bytes_received").isEqualTo("bytes_received", totalBytes)
+                           .columnsEqualTo("files_sent", "files_to_send", 
"bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java 
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
index f9c1d90..58842bc 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
@@ -23,6 +23,7 @@ import java.util.function.Predicate;
 
 import org.apache.cassandra.distributed.api.Row;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
 import org.assertj.core.api.Assertions;
 
 public class QueryResultUtil
@@ -69,21 +70,70 @@ public class QueryResultUtil
         return true;
     }
 
-    public static AssertHelper assertThat(SimpleQueryResult qr)
+    public static SimpleQueryResultAssertHelper assertThat(SimpleQueryResult 
qr)
     {
-        return new AssertHelper(qr);
+        return new SimpleQueryResultAssertHelper(qr);
     }
 
-    public static class AssertHelper
+    public static RowAssertHelper assertThat(Row row)
+    {
+        return new RowAssertHelper(row);
+    }
+
+    public static String expand(SimpleQueryResult qr)
+    {
+        StringBuilder sb = new StringBuilder();
+        int rowNum = 1;
+        while (qr.hasNext())
+        {
+            sb.append("@ Row ").append(rowNum).append('\n');
+            TableBuilder table = new TableBuilder('|');
+            Row next = qr.next();
+            for (String column : qr.names())
+            {
+                Object value = next.get(column);
+                table.add(column, value == null ? null : value.toString());
+            }
+            sb.append(table);
+        }
+        return sb.toString();
+    }
+
+    public static class RowAssertHelper
+    {
+        private final Row row;
+
+        public RowAssertHelper(Row row)
+        {
+            this.row = row;
+        }
+
+        public RowAssertHelper isEqualTo(String column, Object expected)
+        {
+            Object actual = row.get(column);
+            Assertions.assertThat(actual).describedAs("Column %s had 
unexpected value", column).isEqualTo(expected);
+            return this;
+        }
+
+        public RowAssertHelper columnsEqualTo(String first, String... others)
+        {
+            Object expected = row.get(first);
+            for (String other : others)
+                
Assertions.assertThat(row.<Object>get(other)).describedAs("Columns %s and %s 
are not equal", first, other).isEqualTo(expected);
+            return this;
+        }
+    }
+
+    public static class SimpleQueryResultAssertHelper
     {
         private final SimpleQueryResult qr;
 
-        private AssertHelper(SimpleQueryResult qr)
+        private SimpleQueryResultAssertHelper(SimpleQueryResult qr)
         {
             this.qr = qr;
         }
 
-        public AssertHelper contains(Object... values)
+        public SimpleQueryResultAssertHelper contains(Object... values)
         {
             qr.reset();
             if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a, 
values)))
@@ -91,7 +141,7 @@ public class QueryResultUtil
             return this;
         }
 
-        public AssertHelper contains(Row row)
+        public SimpleQueryResultAssertHelper contains(Row row)
         {
             qr.reset();
             if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a, 
row)))
@@ -99,7 +149,7 @@ public class QueryResultUtil
             return this;
         }
 
-        public AssertHelper contains(Predicate<Row> fn)
+        public SimpleQueryResultAssertHelper contains(Predicate<Row> fn)
         {
             qr.reset();
             if (!QueryResultUtil.contains(qr, fn))
@@ -107,7 +157,7 @@ public class QueryResultUtil
             return this;
         }
 
-        public AssertHelper isEqualTo(Object... values)
+        public SimpleQueryResultAssertHelper isEqualTo(Object... values)
         {
             Assertions.assertThat(qr.toObjectArrays())
                       .hasSize(1)
@@ -115,13 +165,13 @@ public class QueryResultUtil
             return this;
         }
 
-        public AssertHelper hasSize(int size)
+        public SimpleQueryResultAssertHelper hasSize(int size)
         {
             Assertions.assertThat(qr.toObjectArrays()).hasSize(size);
             return this;
         }
 
-        public AssertHelper hasSizeGreaterThan(int size)
+        public SimpleQueryResultAssertHelper hasSizeGreaterThan(int size)
         {
             
Assertions.assertThat(qr.toObjectArrays()).hasSizeGreaterThan(size);
             return this;
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 6844cb8..66e9acf 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -1442,6 +1442,7 @@ public abstract class CQLTester
 
             Assert.assertEquals(String.format("Invalid number of (expected) 
values provided for row %d", i), expected == null ? 1 : expected.length, 
meta.size());
 
+            StringBuilder error = new StringBuilder();
             for (int j = 0; j < meta.size(); j++)
             {
                 ColumnSpecification column = meta.get(j);
@@ -1454,15 +1455,17 @@ public abstract class CQLTester
                 {
                     Object actualValueDecoded = actualValue == null ? null : 
column.type.getSerializer().deserialize(actualValue);
                     if (!Objects.equal(expected != null ? expected[j] : null, 
actualValueDecoded))
-                        Assert.fail(String.format("Invalid value for row %d 
column %d (%s of type %s), expected <%s> but got <%s>",
-                                                  i,
-                                                  j,
-                                                  column.name,
-                                                  column.type.asCQL3Type(),
-                                                  
formatValue(expectedByteValue != null ? expectedByteValue.duplicate() : null, 
column.type),
-                                                  formatValue(actualValue, 
column.type)));
+                        error.append(String.format("Invalid value for row %d 
column %d (%s of type %s), expected <%s> but got <%s>",
+                                                   i,
+                                                   j,
+                                                   column.name,
+                                                   column.type.asCQL3Type(),
+                                                   
formatValue(expectedByteValue != null ? expectedByteValue.duplicate() : null, 
column.type),
+                                                   formatValue(actualValue, 
column.type))).append("\n");
                 }
             }
+            if (error.length() > 0)
+                Assert.fail(error.toString());
             i++;
         }
 
diff --git a/test/unit/org/apache/cassandra/db/MmapFileTest.java 
b/test/unit/org/apache/cassandra/db/MmapFileTest.java
index a666426..4619df3 100644
--- a/test/unit/org/apache/cassandra/db/MmapFileTest.java
+++ b/test/unit/org/apache/cassandra/db/MmapFileTest.java
@@ -25,14 +25,24 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.File;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.io.util.FileUtils;
 
 public class MmapFileTest
 {
+    @BeforeClass
+    public static void setup()
+    {
+        // PathUtils touches StorageService which touches StreamManager which 
requires configs be setup
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+
     /**
      * Verifies that {@link sun.misc.Cleaner} works and that mmap'd files can 
be deleted.
      */
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java 
b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 3e28040..d28d0dc 100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@ -92,6 +92,8 @@ public class ColumnFilterTest
     @BeforeClass
     public static void beforeClass()
     {
+        // Gossiper touches StorageService which touches StreamManager which 
requires configs be setup
+        DatabaseDescriptor.daemonInitialization();
         DatabaseDescriptor.setSeedProvider(Arrays::asList);
         DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch());
         DatabaseDescriptor.setDefaultFailureDetector();
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
new file mode 100644
index 0000000..270305a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.StreamingChannel;
+import org.apache.cassandra.streaming.StreamingState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.assertj.core.util.Throwables;
+
+public class StreamingVirtualTableTest extends CQLTester
+{
+    private static final String KS_NAME = "vts";
+    private static final InetAddressAndPort PEER1 = address(127, 0, 0, 1);
+    private static final InetAddressAndPort PEER2 = address(127, 0, 0, 2);
+    private static final InetAddressAndPort PEER3 = address(127, 0, 0, 3);
+    private static String TABLE_NAME;
+
+    @BeforeClass
+    public static void setup()
+    {
+        CQLTester.setUpClass();
+        StreamingVirtualTable table = new StreamingVirtualTable(KS_NAME);
+        TABLE_NAME = table.toString();
+        VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, 
ImmutableList.of(table)));
+    }
+
+    @Before
+    public void clearState()
+    {
+        StreamManager.instance.clearStates();
+    }
+
+    @Test
+    public void empty() throws Throwable
+    {
+        assertEmpty(execute(t("select * from %s")));
+    }
+
+    @Test
+    public void single() throws Throwable
+    {
+        StreamingState state = stream(true);
+        assertRows(execute(t("select id, follower, operation, peers, status, 
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+                   new Object[] { state.id(), true, "Repair", 
Collections.emptyList(), "init", 0F, new Date(state.lastUpdatedAtMillis()), 
null, null });
+
+        state.phase.start();
+        assertRows(execute(t("select id, follower, operation, peers, status, 
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+                   new Object[] { state.id(), true, "Repair", 
Collections.emptyList(), "start", 0F, new Date(state.lastUpdatedAtMillis()), 
null, null });
+
+        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.PREPARING)));
+
+        state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR, 
ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(), 
Collections.emptyList(), StreamSession.State.COMPLETE))));
+        assertRows(execute(t("select id, follower, operation, peers, status, 
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+                   new Object[] { state.id(), true, "Repair", 
Arrays.asList(address(127, 0, 0, 2).toString()), "success", 100F, new 
Date(state.lastUpdatedAtMillis()), null, null });
+    }
+
+    @Test
+    public void progressInitiator() throws Throwable
+    {
+        progress(false);
+    }
+
+    @Test
+    public void progressFollower() throws Throwable
+    {
+        progress(true);
+    }
+
+    public void progress(boolean follower) throws Throwable
+    {
+        StreamingState state = stream(follower);
+        StreamResultFuture future = state.future();
+        state.phase.start();
+
+        SessionInfo s1 = new SessionInfo(PEER2, 0, 
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), 
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
+        SessionInfo s2 = new SessionInfo(PEER3, 0, 
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), 
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
+
+        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), s1));
+        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), s2));
+
+        long bytesToReceive = 0, bytesToSend = 0;
+        long filesToReceive = 0, filesToSend = 0;
+        for (SessionInfo s : Arrays.asList(s1, s2))
+        {
+            bytesToReceive += s.getTotalSizeToReceive();
+            bytesToSend += s.getTotalSizeToSend();
+            filesToReceive += s.getTotalFilesToReceive();
+            filesToSend += s.getTotalFilesToSend();
+        }
+        assertRows(execute(t("select id, follower, peers, status, 
progress_percentage, bytes_to_receive, bytes_received, bytes_to_send, 
bytes_sent, files_to_receive, files_received, files_to_send, files_sent from 
%s")),
+                   new Object[] { state.id(), follower, 
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", 0F, bytesToReceive, 
0L, bytesToSend, 0L, filesToReceive, 0L, filesToSend, 0L });
+
+        // update progress
+        long bytesReceived = 0, bytesSent = 0;
+        for (SessionInfo s : Arrays.asList(s1, s2))
+        {
+            long in = s.getTotalFilesToReceive() - 1;
+            long inBytes = s.getTotalSizeToReceive() - in;
+            long out = s.getTotalFilesToSend() - 1;
+            long outBytes = s.getTotalSizeToSend() - out;
+            state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), 
new ProgressInfo((InetAddressAndPort) s.peer, 0, "0", 
ProgressInfo.Direction.IN, inBytes, inBytes)));
+            state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), 
new ProgressInfo((InetAddressAndPort) s.peer, 0, "0", 
ProgressInfo.Direction.OUT, outBytes, outBytes)));
+            bytesReceived += inBytes;
+            bytesSent += outBytes;
+        }
+
+        assertRows(execute(t("select id, follower, peers, status, 
bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive, 
files_received, files_to_send, files_sent from %s")),
+                   new Object[] { state.id(), follower, 
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", bytesToReceive, 
bytesReceived, bytesToSend, bytesSent, filesToReceive, 2L, filesToSend, 2L });
+
+        // finish
+        for (SessionInfo s : Arrays.asList(s1, s2))
+        {
+            // complete the rest
+            for (long i = 1; i < s.getTotalFilesToReceive(); i++)
+                state.handleStreamEvent(new 
StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) 
s.peer, 0, Long.toString(i), ProgressInfo.Direction.IN, 1, 1)));
+            for (long i = 1; i < s.getTotalFilesToSend(); i++)
+                state.handleStreamEvent(new 
StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) 
s.peer, 0, Long.toString(i), ProgressInfo.Direction.OUT, 1, 1)));
+        }
+
+        assertRows(execute(t("select id, follower, peers, status, 
progress_percentage, bytes_to_receive, bytes_received, bytes_to_send, 
bytes_sent, files_to_receive, files_received, files_to_send, files_sent from 
%s")),
+                   new Object[] { state.id(), follower, 
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", 99F, 
bytesToReceive, bytesToReceive, bytesToSend, bytesToSend, filesToReceive, 
filesToReceive, filesToSend, filesToSend });
+
+        state.onSuccess(future.getCurrentState());
+        assertRows(execute(t("select id, follower, peers, status, 
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+                   new Object[] { state.id(), follower, 
Arrays.asList(PEER2.toString(), PEER3.toString()), "success", 100F, new 
Date(state.lastUpdatedAtMillis()), null, null });
+    }
+
+    private static StreamSummary streamSummary()
+    {
+        int files = ThreadLocalRandom.current().nextInt(2, 10);
+        return new StreamSummary(TableId.fromUUID(UUID.randomUUID()), files, 
files * 1024);
+    }
+
+    @Test
+    public void failed() throws Throwable
+    {
+        StreamingState state = stream(true);
+        RuntimeException t = new RuntimeException("You failed!");
+        state.onFailure(t);
+        assertRows(execute(t("select id, follower, peers, status, 
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+                   new Object[] { state.id(), true, Collections.emptyList(), 
"failure", 100F, new Date(state.lastUpdatedAtMillis()), 
Throwables.getStackTrace(t), null });
+    }
+
+    private static String t(String query)
+    {
+        return String.format(query, TABLE_NAME);
+    }
+
+    private static StreamingState stream(boolean follower)
+    {
+        StreamResultFuture future = new StreamResultFuture(UUID.randomUUID(), 
StreamOperation.REPAIR, new StreamCoordinator(StreamOperation.REPAIR, 0, 
StreamingChannel.Factory.Global.streamingFactory(), follower, false, null, 
null) {
+            // initiator requires active sessions exist, else the future 
becomes success right away.
+            @Override
+            public synchronized boolean hasActiveSessions()
+            {
+                return true;
+            }
+        });
+        StreamingState state = new StreamingState(future);
+        if (follower) StreamManager.instance.putFollowerStream(future);
+        else StreamManager.instance.putInitiatorStream(future);
+        StreamManager.instance.putStreamingState(state);
+        future.addEventListener(state);
+        return state;
+    }
+
+    private static InetAddressAndPort address(int a, int b, int c, int d)
+    {
+        try
+        {
+            return InetAddressAndPort.getByAddress(new byte[] {(byte) a, 
(byte) b, (byte) c, (byte) d});
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/io/util/FileTest.java 
b/test/unit/org/apache/cassandra/io/util/FileTest.java
index 0fb415a..d7340db 100644
--- a/test/unit/org/apache/cassandra/io/util/FileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/FileTest.java
@@ -33,6 +33,7 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.psjava.util.Triple;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -51,6 +52,9 @@ public class FileTest
         dir = new java.io.File(parent, dirName); //checkstyle: permit this 
instantiation
         dir.mkdirs();
         new File(dir).deleteRecursiveOnExit();
+
+        // PathUtils touches StorageService which touches StreamManager which 
requires configs be setup
+        DatabaseDescriptor.daemonInitialization();
     }
 
 
diff --git 
a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
 
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
index b70e514..11efc3c 100644
--- 
a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
+++ 
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
@@ -29,10 +29,12 @@ import java.util.stream.Collectors;
 
 import org.junit.After;
 import org.junit.Assume;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.StartupChecksOptions;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.io.util.File;
@@ -175,6 +177,13 @@ public abstract class AbstractFilesystemOwnershipCheckTest
         file.delete();
     }
 
+    @BeforeClass
+    public static void setupConfig()
+    {
+        // PathUtils touches StorageService which touches StreamManager which 
requires configs be setup
+        DatabaseDescriptor.daemonInitialization();
+    }
+
     @After
     public void teardown() throws IOException
     {
diff --git a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java 
b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
index 86b63a2..5227e51 100644
--- a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
+++ b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
@@ -28,9 +28,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.File;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import net.openhft.chronicle.queue.ChronicleQueue;
@@ -60,6 +62,13 @@ public class BinLogTest
     private BinLog binLog;
     private Path path;
 
+    @BeforeClass
+    public static void setup()
+    {
+        // PathUtils touches StorageService which touches StreamManager which 
requires configs be setup
+        DatabaseDescriptor.daemonInitialization();
+    }
+
     @Before
     public void setUp() throws Exception
     {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to