This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4444721b6d Provide summary of failed SessionInfo's in 
StreamResultFuture
4444721b6d is described below

commit 4444721b6de555352bf0ac3ef7e36f94dc832f41
Author: Natnael Adere <[email protected]>
AuthorDate: Fri Mar 24 09:28:37 2023 -0700

    Provide summary of failed SessionInfo's in StreamResultFuture
    
    patch by Natnael Adere; reviewed by David Capwell, Dinesh Joshi, Jon 
Meredith for CASSANDRA-17199
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |   5 +
 .../apache/cassandra/streaming/SessionInfo.java    |  13 +-
 .../apache/cassandra/streaming/StreamManager.java  |  24 ++-
 .../cassandra/streaming/StreamResultFuture.java    |  12 +-
 .../apache/cassandra/streaming/StreamSession.java  |  80 ++++++---
 .../apache/cassandra/streaming/StreamState.java    |   4 +
 .../apache/cassandra/streaming/StreamingState.java |  20 ++-
 .../management/SessionInfoCompositeData.java       |   2 +-
 .../org/apache/cassandra/utils/ObjectSizes.java    |  21 +++
 src/java/org/apache/cassandra/utils/TimeUUID.java  |   1 +
 .../test/streaming/AbstractStreamFailureLogs.java  | 175 ++++++++++++++++++++
 .../test/streaming/BoundExceptionTest.java         | 165 +++++++++++++++++++
 ...amFailureLogsFailureDueToSessionFailedTest.java |  32 ++++
 ...mFailureLogsFailureDueToSessionTimeoutTest.java | 181 +++++++++++++++++++++
 ...amFailureLogsFailureInTheMiddleWithEOFTest.java |  32 ++++
 ...ilureLogsFailureInTheMiddleWithUnknownTest.java |  32 ++++
 .../microbench/ZeroCopyStreamingBenchmark.java     |   2 +-
 .../CassandraEntireSSTableStreamWriterTest.java    |   2 +-
 ...TableStreamConcurrentComponentMutationTest.java |   2 +-
 .../db/virtual/StreamingVirtualTableTest.java      |   8 +-
 ...ntireSSTableStreamingCorrectFilesCountTest.java |   3 +-
 .../cassandra/streaming/SessionInfoTest.java       |   2 +-
 .../cassandra/tools/nodetool/NetStatsTest.java     |   3 +-
 25 files changed, 780 insertions(+), 44 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index edacdfc568..01918fe861 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0
+ * Provide summary of failed SessionInfo's in StreamResultFuture 
(CASSANDRA-17199)
  * CEP-20: Dynamic Data Masking (CASSANDRA-17940)
  * Add system_views.snapshots virtual table (CASSANDRA-18102)
  * Update OpenHFT dependencies (chronicle-queue, chronicle-core, 
chronicle-bytes, chronicle-wire, chronicle-threads) (CASSANDRA-18049)
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 91cae27b20..839e4c2c7e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -165,6 +165,8 @@ public class Config
     @Replaces(oldName = "slow_query_log_timeout_in_ms", converter = 
Converters.MILLIS_DURATION_LONG, deprecated = true)
     public volatile DurationSpec.LongMillisecondsBound slow_query_log_timeout 
= new DurationSpec.LongMillisecondsBound("500ms");
 
+    public volatile DurationSpec.LongMillisecondsBound 
stream_transfer_task_timeout = new DurationSpec.LongMillisecondsBound("12h");
+
     public volatile double phi_convict_threshold = 8.0;
 
     public int concurrent_reads = 32;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 02b0038e89..33b229e106 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3510,6 +3510,11 @@ public class DatabaseDescriptor
         return conf.stream_entire_sstables;
     }
 
+    public static DurationSpec.LongMillisecondsBound 
getStreamTransferTaskTimeout()
+    {
+        return conf.stream_transfer_task_timeout;
+    }
+
     public static boolean getSkipStreamDiskSpaceCheck()
     {
         return conf.skip_stream_disk_space_check;
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java 
b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index d95d85bbe5..92af245988 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -47,12 +47,15 @@ public final class SessionInfo implements Serializable
     private final Map<String, ProgressInfo> receivingFiles = new 
ConcurrentHashMap<>();
     private final Map<String, ProgressInfo> sendingFiles = new 
ConcurrentHashMap<>();
 
+    public final String failureReason;
+
     public SessionInfo(InetSocketAddress peer,
                        int sessionIndex,
                        InetSocketAddress connecting,
                        Collection<StreamSummary> receivingSummaries,
                        Collection<StreamSummary> sendingSummaries,
-                       StreamSession.State state)
+                       StreamSession.State state,
+                       String failureReason)
     {
         this.peer = peer;
         this.sessionIndex = sessionIndex;
@@ -60,11 +63,12 @@ public final class SessionInfo implements Serializable
         this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
         this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
         this.state = state;
+        this.failureReason =  failureReason;
     }
 
     public SessionInfo(SessionInfo other)
     {
-        this(other.peer, other.sessionIndex, other.connecting, 
other.receivingSummaries, other.sendingSummaries, other.state);
+        this(other.peer, other.sessionIndex, other.connecting, 
other.receivingSummaries, other.sendingSummaries, other.state, 
other.failureReason);
     }
 
     public boolean isFailed()
@@ -205,4 +209,9 @@ public final class SessionInfo implements Serializable
     {
         return new SessionSummary(FBUtilities.getBroadcastAddressAndPort(), 
peer, receivingSummaries, sendingSummaries);
     }
+
+    public String getFailureReason()
+    {
+        return failureReason;
+    }
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java 
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index e49cc4d53c..fec8b2de21 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -31,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.Weigher;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
@@ -252,18 +253,35 @@ public class StreamManager implements StreamManagerMBean
         }
     };
 
+    protected void addStreamingStateAgain(StreamingState state)
+    {
+        if (!DatabaseDescriptor.getStreamingStatsEnabled())
+            return;
+        states.put(state.id(), state);
+    }
+
     public StreamManager()
     {
         DurationSpec.LongNanosecondsBound duration = 
DatabaseDescriptor.getStreamingStateExpires();
         long sizeBytes = DatabaseDescriptor.getStreamingStateSize().toBytes();
-        long numElements = sizeBytes / StreamingState.ELEMENT_SIZE;
-        logger.info("Storing streaming state for {} or for {} elements", 
duration, numElements);
+        logger.info("Storing streaming state for {} or for size {}", duration, 
sizeBytes);
         states = CacheBuilder.newBuilder()
                              .expireAfterWrite(duration.quantity(), 
duration.unit())
-                             .maximumSize(numElements)
+                             .maximumWeight(sizeBytes)
+                             .weigher(new StreamingStateWeigher())
                              .build();
     }
 
+    private static class StreamingStateWeigher implements 
Weigher<TimeUUID,StreamingState>
+    {
+        @Override
+        public int weigh(TimeUUID key, StreamingState val)
+        {
+            long costOfStreamingState = val.unsharedHeapSize() + 
TimeUUID.TIMEUUID_SIZE;
+            return Math.toIntExact(costOfStreamingState);
+        }
+    }
+
     public void start()
     {
         addListener(listener);
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index b43203dee3..5277b9de73 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -239,8 +239,16 @@ public final class StreamResultFuture extends 
AsyncFuture<StreamState>
             StreamState finalState = getCurrentState();
             if (finalState.hasFailedSession())
             {
-                logger.warn("[Stream #{}] Stream failed", planId);
-                tryFailure(new StreamException(finalState, "Stream failed"));
+                StringBuilder stringBuilder = new StringBuilder();
+                stringBuilder.append("Stream failed: ");
+                for (SessionInfo info : finalState.sessions())
+                {
+                    if (info.isFailed())
+                        stringBuilder.append("\nSession peer 
").append(info.peer).append(' ').append(info.failureReason);
+                }
+                String message = stringBuilder.toString();
+                logger.warn("[Stream #{}] {}", planId, message);
+                tryFailure(new StreamException(finalState, message));
             }
             else if (finalState.hasAbortedSession())
             {
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 3917e1a5fd..a0e9b2d079 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -58,7 +59,6 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.CompactionStrategyManager;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.RangesAtEndpoint;
@@ -154,9 +154,10 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
  * (via {@link org.apache.cassandra.net.MessagingService}, while the actual 
files themselves are sent by a special
  * "streaming" connection type. See {@link StreamingMultiplexedChannel} for 
details. Because of the asynchronous
  */
-public class StreamSession implements IEndpointStateChangeSubscriber
+public class StreamSession
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamSession.class);
+    private static final int DEBUG_STACKTRACE_LIMIT = 
Integer.parseInt(System.getProperty("cassandra.streaming.debug_stacktrace_limit",
 "2"));
 
     public enum PrepareDirection { SEND, ACK }
 
@@ -203,6 +204,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     private final TimeUUID pendingRepair;
     private final PreviewKind previewKind;
 
+    public String failureReason;
+
 /**
  * State Transition:
  *
@@ -514,13 +517,20 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         }
     }
 
-    private synchronized Future<?> closeSession(State finalState)
+    private Future<?> closeSession(State finalState)
+    {
+        return closeSession(finalState, null);
+    }
+
+    private synchronized Future<?> closeSession(State finalState, String 
failureReason)
     {
         // it's session is already closed
         if (closeFuture != null)
             return closeFuture;
 
         state(finalState);
+        //this refers to StreamInfo
+        this.failureReason = failureReason;
 
         List<Future<?>> futures = new ArrayList<>();
 
@@ -673,7 +683,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             if (state.finalState)
             {
                 logger.debug("[Stream #{}] Socket closed after session 
completed with state {}", planId(), state);
-
                 return null;
             }
             else
@@ -682,8 +691,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                              planId(),
                              peer.getHostAddressAndPort(),
                              e);
-
-                return closeSession(State.FAILED);
+                return closeSession(State.FAILED, "Failed because there was an 
" + e.getClass().getCanonicalName() + " with state=" + state.name());
             }
         }
 
@@ -694,8 +702,9 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             state(State.FAILED); // make sure subsequent error handling sees 
the session in a final state 
             channel.sendControlMessage(new 
SessionFailedMessage()).awaitUninterruptibly();
         }
-
-        return closeSession(State.FAILED);
+        StringBuilder failureReason = new StringBuilder("Failed because of an 
unknown exception\n");
+        boundStackTrace(e, DEBUG_STACKTRACE_LIMIT, failureReason);
+        return closeSession(State.FAILED, failureReason.toString());
     }
 
     private void logError(Throwable e)
@@ -981,7 +990,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         StreamTransferTask task = transfers.get(message.header.tableId);
         if (task != null)
         {
-            task.scheduleTimeout(message.header.sequenceNumber, 12, 
TimeUnit.HOURS);
+            task.scheduleTimeout(message.header.sequenceNumber, 
DatabaseDescriptor.getStreamTransferTaskTimeout().toMilliseconds(), 
TimeUnit.MILLISECONDS);
         }
     }
 
@@ -1107,7 +1116,9 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     public synchronized void sessionFailed()
     {
         logger.error("[Stream #{}] Remote peer {} failed stream session.", 
planId(), peer.toString());
-        closeSession(State.FAILED);
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("Remote peer ").append(peer).append(" failed 
stream session");
+        closeSession(State.FAILED, stringBuilder.toString());
     }
 
     /**
@@ -1116,7 +1127,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     public synchronized void sessionTimeout()
     {
         logger.error("[Stream #{}] timeout with {}.", planId(), 
peer.toString());
-        closeSession(State.FAILED);
+        closeSession(State.FAILED, "Session timed out");
     }
 
     /**
@@ -1130,7 +1141,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         List<StreamSummary> transferSummaries = Lists.newArrayList();
         for (StreamTask transfer : transfers.values())
             transferSummaries.add(transfer.getSummary());
-        return new SessionInfo(channel.peer(), index, channel.connectedTo(), 
receivingSummaries, transferSummaries, state);
+        return new SessionInfo(channel.peer(), index, channel.connectedTo(), 
receivingSummaries, transferSummaries, state, failureReason);
     }
 
     public synchronized void taskCompleted(StreamReceiveTask completedTask)
@@ -1145,18 +1156,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         maybeCompleted();
     }
 
-    public void onRemove(InetAddressAndPort endpoint)
-    {
-        logger.error("[Stream #{}] Session failed because remote peer {} has 
left.", planId(), peer.toString());
-        closeSession(State.FAILED);
-    }
-
-    public void onRestart(InetAddressAndPort endpoint, EndpointState epState)
-    {
-        logger.error("[Stream #{}] Session failed because remote peer {} was 
restarted.", planId(), peer.toString());
-        closeSession(State.FAILED);
-    }
-
     private void completePreview()
     {
         try
@@ -1340,4 +1339,37 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                ", state=" + state +
                '}';
     }
+
+    public static StringBuilder boundStackTrace(Throwable e, int limit, 
StringBuilder out)
+    {
+        Set<Throwable> visited = Collections.newSetFromMap(new 
IdentityHashMap<>());
+        return boundStackTrace(e, limit, limit, visited, out);
+    }
+
+    public static StringBuilder boundStackTrace(Throwable e, int limit, int 
counter, Set<Throwable> visited, StringBuilder out)
+    {
+        if (e == null)
+            return out;
+
+        if (!visited.add(e))
+            return out.append("[CIRCULAR REFERENCE: 
").append(e.getClass().getName()).append(": 
").append(e.getMessage()).append("]").append('\n');
+        visited.add(e);
+
+        StackTraceElement[] stackTrace = e.getStackTrace();
+        out.append(e.getClass().getName() + ": " + 
e.getMessage()).append('\n');
+
+        // When dealing with the leaf, ignore how many stack traces were 
already written, and allow the max.
+        // This is here as the leaf tends to show where the issue started, so 
tends to be impactful for debugging
+        if (e.getCause() == null)
+            counter = limit;
+
+        for (int i = 0, size = Math.min(e.getStackTrace().length, limit); i < 
size && counter > 0; i++)
+        {
+            out.append('\t').append(stackTrace[i]).append('\n');
+            counter--;
+        }
+
+        boundStackTrace(e.getCause(), limit, counter, visited, out);
+        return out;
+    }
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamState.java 
b/src/java/org/apache/cassandra/streaming/StreamState.java
index 88eb76df43..69ba6988ed 100644
--- a/src/java/org/apache/cassandra/streaming/StreamState.java
+++ b/src/java/org/apache/cassandra/streaming/StreamState.java
@@ -56,4 +56,8 @@ public class StreamState implements Serializable
     {
         return Lists.newArrayList(Iterables.transform(sessions, 
SessionInfo::createSummary));
     }
+
+    public Set<SessionInfo> sessions() {
+        return sessions;
+    }
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamingState.java 
b/src/java/org/apache/cassandra/streaming/StreamingState.java
index c2eed1ea9e..aaeceb2fcf 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingState.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingState.java
@@ -34,6 +34,7 @@ import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.virtual.SimpleDataSet;
 import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
 import org.apache.cassandra.utils.Clock;
@@ -43,11 +44,11 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 
 import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
 
-public class StreamingState implements StreamEventHandler
+public class StreamingState implements StreamEventHandler, IMeasurableMemory
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamingState.class);
 
-    public static final long ELEMENT_SIZE = ObjectSizes.measureDeep(new 
StreamingState(nextTimeUUID(), StreamOperation.OTHER, false));
+    public static final long EMPTY = ObjectSizes.measureDeep(new 
StreamingState(nextTimeUUID(), StreamOperation.OTHER, false));
 
     public enum Status
     {INIT, START, SUCCESS, FAILURE}
@@ -70,6 +71,14 @@ public class StreamingState implements StreamEventHandler
     // API for state changes
     public final Phase phase = new Phase();
 
+    @Override
+    public long unsharedHeapSize()
+    {
+        long costOfPeers = peers().size() * 
(ObjectSizes.IPV6_SOCKET_ADDRESS_SIZE + 48); // 48 represents the datastructure 
cost computed by the JOL
+        long costOfCompleteMessage = ObjectSizes.sizeOf(completeMessage());
+        return costOfPeers + costOfCompleteMessage + EMPTY;
+    }
+
     public StreamingState(StreamResultFuture result)
     {
         this(result.planId, result.streamOperation, 
result.getCoordinator().isFollower());
@@ -104,6 +113,11 @@ public class StreamingState implements StreamEventHandler
         return this.peers;
     }
 
+    public String completeMessage()
+    {
+        return this.completeMessage;
+    }
+
     public Status status()
     {
         return status;
@@ -283,6 +297,8 @@ public class StreamingState implements StreamEventHandler
     {
         completeMessage = Throwables.getStackTraceAsString(throwable);
         updateState(Status.FAILURE);
+        //we know the size is now very different from the estimate so 
recompute by adding again
+        StreamManager.instance.addStreamingStateAgain(this);
     }
 
     private synchronized void updateState(Status state)
diff --git 
a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
 
b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index d7a78e39f6..eaa37bbca8 100644
--- 
a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++ 
b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -151,7 +151,7 @@ public class SessionInfoCompositeData
                                            connecting,
                                            
fromArrayOfCompositeData((CompositeData[]) values[5], toStreamSummary),
                                            
fromArrayOfCompositeData((CompositeData[]) values[6], toStreamSummary),
-                                           
StreamSession.State.valueOf((String) values[7]));
+                                           
StreamSession.State.valueOf((String) values[7]), null); // null is here to 
maintain backwards compatibility
         Function<CompositeData, ProgressInfo> toProgressInfo = new 
Function<CompositeData, ProgressInfo>()
         {
             public ProgressInfo apply(CompositeData input)
diff --git a/src/java/org/apache/cassandra/utils/ObjectSizes.java 
b/src/java/org/apache/cassandra/utils/ObjectSizes.java
index 9e1b407f1d..2d94983d18 100644
--- a/src/java/org/apache/cassandra/utils/ObjectSizes.java
+++ b/src/java/org/apache/cassandra/utils/ObjectSizes.java
@@ -19,6 +19,9 @@
 
 package org.apache.cassandra.utils;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 import org.github.jamm.MemoryLayoutSpecification;
@@ -39,6 +42,8 @@ public class ObjectSizes
 
     private static final long DIRECT_BUFFER_HEAP_SIZE = 
measure(ByteBuffer.allocateDirect(0));
 
+    public static final long IPV6_SOCKET_ADDRESS_SIZE = 
ObjectSizes.measureDeep(new InetSocketAddress(getIpvAddress(16), 42));
+
     /**
      * Memory a byte array consumes
      *
@@ -236,4 +241,20 @@ public class ObjectSizes
     {
         return meter.measure(pojo);
     }
+
+    private static InetAddress getIpvAddress(int size)
+    {
+        if (size == 16 || size ==4)
+        {
+            try
+            {
+                return InetAddress.getByAddress(new byte[size]);
+            }
+            catch (UnknownHostException e)
+            {
+                throw new IllegalArgumentException("Invalid size of a byte 
array when getting and ipv address: " + size);
+            }
+        }
+        else throw new IllegalArgumentException("Excpected a byte array size 
of 4 or 16 for an ipv address but got: " + size);
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/TimeUUID.java 
b/src/java/org/apache/cassandra/utils/TimeUUID.java
index 8d79096ecd..b944d843d4 100644
--- a/src/java/org/apache/cassandra/utils/TimeUUID.java
+++ b/src/java/org/apache/cassandra/utils/TimeUUID.java
@@ -82,6 +82,7 @@ public class TimeUUID implements Serializable, 
Comparable<TimeUUID>
     private static final long MIN_CLOCK_SEQ_AND_NODE = 0x8080808080808080L;
     private static final long MAX_CLOCK_SEQ_AND_NODE = 0x7f7f7f7f7f7f7f7fL;
 
+    public static final long TIMEUUID_SIZE = ObjectSizes.measureDeep(new 
TimeUUID(10, 10));
 
     final long uuidTimestamp, lsb;
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/AbstractStreamFailureLogs.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/AbstractStreamFailureLogs.java
new file mode 100644
index 0000000000..28aa6b0bcc
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/AbstractStreamFailureLogs.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.assertj.core.api.Assertions;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class AbstractStreamFailureLogs extends TestBaseImpl
+{
+    protected static final Logger logger = 
LoggerFactory.getLogger(AbstractStreamFailureLogs.class);
+
+    protected static final int FAILING_NODE = 2;
+
+    protected void streamTest(boolean zeroCopyStreaming, String reason, 
Integer failedNode) throws IOException
+    {
+        try (Cluster cluster = Cluster.build(2)
+                                      
.withInstanceInitializer(BBStreamHelper::install)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        
.set("stream_entire_sstables", zeroCopyStreaming)
+                                                        // when die, this will 
try to halt JVM, which is easier to validate in the test
+                                                        // other levels 
require checking state of the subsystems
+                                                        
.set("disk_failure_policy", "die"))
+                                      .start())
+        {
+            init(cluster);
+
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int 
PRIMARY KEY)"));
+
+            triggerStreaming(cluster);
+            // make sure disk failure policy is not triggered
+
+            IInvokableInstance failingNode = cluster.get(failedNode);
+
+            searchForLog(failingNode, reason);
+        }
+    }
+
+    protected void triggerStreaming(Cluster cluster)
+    {
+        IInvokableInstance node1 = cluster.get(1);
+        IInvokableInstance node2 = cluster.get(2);
+
+        // repair will do streaming IFF there is a mismatch; so cause one
+        for (int i = 0; i < 10; i++)
+            node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES 
(?)"), i); // timestamp won't match, causing a mismatch
+
+        // trigger streaming; expected to fail as streaming socket closed in 
the middle (currently this is an unrecoverable event)
+        //Blocks until the stream is complete
+        node2.nodetoolResult("repair", "-full", KEYSPACE, 
"tbl").asserts().failure();
+    }
+
+    protected void searchForLog(IInvokableInstance failingNode, String reason)
+    {
+        searchForLog(failingNode, true, reason);
+    }
+
+    protected boolean searchForLog(IInvokableInstance failingNode, boolean 
failIfNoMatch, String reason)
+    {
+        LogResult<List<String>> result = failingNode.logs().grepForErrors(-1, 
Pattern.compile("Stream failed:"));
+        // grepForErrors will include all ERROR logs even if they don't match 
the pattern; for this reason need to filter after the fact
+        List<String> matches = result.getResult();
+
+        matches = matches.stream().filter(s -> 
s.startsWith("WARN")).collect(Collectors.toList());
+        logger.info("Stream failed logs found: {}", String.join("\n", 
matches));
+        if (matches.isEmpty() && !failIfNoMatch)
+            return false;
+
+        Assertions.assertThat(matches)
+                  .describedAs("node%d expected to find %s but could not", 
failingNode.config().num(), reason)
+                  .hasSize(1);
+        String logLine = matches.get(0);
+        Assertions.assertThat(logLine).contains(reason);
+
+        Matcher match = Pattern.compile(".*\\[Stream 
#(.*)\\]").matcher(logLine);
+        if (!match.find()) throw new AssertionError("Unable to parse: " + 
logLine);
+        UUID planId = UUID.fromString(match.group(1));
+        SimpleQueryResult qr = failingNode.executeInternalWithResult("SELECT * 
FROM system_views.streaming WHERE id=?", planId);
+        Assertions.assertThat(qr.hasNext()).isTrue();
+        
Assertions.assertThat(qr.next().getString("failure_cause")).contains(reason);
+        return true;
+    }
+
+    public static class BBStreamHelper
+    {
+        @SuppressWarnings("unused")
+        public static int writeDirectlyToChannel(ByteBuffer buf, @SuperCall 
Callable<Integer> zuper) throws Exception
+        {
+            if (isCaller(SSTableZeroCopyWriter.class.getName(), "write"))
+                throw new RuntimeException("TEST");
+            // different context; pass through
+            return zuper.call();
+        }
+
+        @SuppressWarnings("unused")
+        public static boolean append(UnfilteredRowIterator partition, 
@SuperCall Callable<Boolean> zuper) throws Exception
+        {
+            if (isCaller(CassandraIncomingFile.class.getName(), "read")) // 
handles compressed and non-compressed
+                throw new java.nio.channels.ClosedChannelException();
+            // different context; pass through
+            return zuper.call();
+        }
+
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            if (num != FAILING_NODE)
+                return;
+            new ByteBuddy().rebase(SequentialWriter.class)
+                           
.method(named("writeDirectlyToChannel").and(takesArguments(1)))
+                           
.intercept(MethodDelegation.to(BBStreamHelper.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+
+            new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+                           .method(named("append").and(takesArguments(1)))
+                           
.intercept(MethodDelegation.to(BBStreamHelper.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+
+        }
+    }
+
+    protected static boolean isCaller(String klass, String method)
+    {
+        StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+        for (int i = 0; i < stack.length; i++)
+        {
+            StackTraceElement e = stack[i];
+            if (klass.equals(e.getClassName()) && 
method.equals(e.getMethodName()))
+                return true;
+        }
+        return false;
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/BoundExceptionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/BoundExceptionTest.java
new file mode 100644
index 0000000000..23c445c99f
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/BoundExceptionTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import org.junit.Test;
+
+import org.apache.cassandra.streaming.StreamSession;
+
+import static org.junit.Assert.assertEquals;
+
+public class BoundExceptionTest
+{
+    private static final int LIMIT = 2;
+
+    @Test
+    public void testSingleException()
+    {
+        Throwable exceptionToTest = exception("test exception");
+        StringBuilder boundedStackTrace = 
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+
+        String expectedStackTrace = "java.lang.RuntimeException: test 
exception\n" +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
 +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n";
+
+        assertEquals(expectedStackTrace,boundedStackTrace.toString());
+    }
+
+    @Test
+    public void testNestedException()
+    {
+        Throwable exceptionToTest = exception(exception("the disk /foo/var is 
bad", exception("Bad disk somewhere")));
+        StringBuilder boundedStackTrace = 
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+
+        String expectedStackTrace = "java.lang.RuntimeException: 
java.lang.RuntimeException: the disk /foo/var is bad\n" +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
 +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n"
 +
+                                    "java.lang.RuntimeException: the disk 
/foo/var is bad\n" +
+                                    "java.lang.RuntimeException: Bad disk 
somewhere\n" +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
 +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n";
+
+        assertEquals(expectedStackTrace, boundedStackTrace.toString());
+    }
+
+    @Test
+    public void testExceptionCycle()
+    {
+        Exception e1 = exception("Test exception 1");
+        Exception e2 = exception("Test exception 2");
+
+        e1.initCause(e2);
+        e2.initCause(e1);
+
+        StringBuilder boundedStackTrace = StreamSession.boundStackTrace(e1, 
LIMIT, new StringBuilder());
+        String expectedStackTrace = "java.lang.RuntimeException: Test 
exception 1\n" +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
 +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n"
 +
+                                    "java.lang.RuntimeException: Test 
exception 2\n" +
+                                    "[CIRCULAR REFERENCE: 
java.lang.RuntimeException: Test exception 1]\n";
+
+        assertEquals(expectedStackTrace, boundedStackTrace.toString());
+    }
+
+    @Test
+    public void testEmptyStackTrace()
+    {
+        Throwable exceptionToTest = exception("there are words here", 0);
+
+        StringBuilder boundedStackTrace = 
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+        String expectedStackTrace = "java.lang.RuntimeException: there are 
words here\n";
+
+        assertEquals(expectedStackTrace,boundedStackTrace.toString());
+    }
+
+    @Test
+    public void testEmptyNestedStackTrace()
+    {
+        Throwable exceptionToTest = exception(exception("the disk /foo/var is 
bad", exception("Bad disk somewhere"), 0), 0);
+
+        StringBuilder boundedStackTrace = 
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+        String expectedStackTrace = "java.lang.RuntimeException: 
java.lang.RuntimeException: the disk /foo/var is bad\n" +
+                                    "java.lang.RuntimeException: the disk 
/foo/var is bad\n" +
+                                    "java.lang.RuntimeException: Bad disk 
somewhere\n" +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
 +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n";
+
+        assertEquals(expectedStackTrace, boundedStackTrace.toString());
+    }
+
+    @Test
+    public void testLimitLargerThanStackTrace()
+    {
+        Throwable exceptionToTest = exception(exception("the disk /foo/var is 
bad", exception("Bad disk somewhere")), 1);
+
+        StringBuilder boundedStackTrace = 
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+        String expectedStackTrace = "java.lang.RuntimeException: 
java.lang.RuntimeException: the disk /foo/var is bad\n" +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
 +
+                                    "java.lang.RuntimeException: the disk 
/foo/var is bad\n" +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
 +
+                                    "java.lang.RuntimeException: Bad disk 
somewhere\n" +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
 +
+                                    
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n";
+
+        assertEquals(expectedStackTrace, boundedStackTrace.toString());
+    }
+
+    private static StackTraceElement[] frames(int length)
+    {
+        StackTraceElement[] frames = new StackTraceElement[length];
+        for (int i = 0; i < length; i++)
+            frames[i] = new 
StackTraceElement(BoundExceptionTest.class.getCanonicalName(), "method", 
BoundExceptionTest.class.getSimpleName() + ".java", i);
+        return frames;
+    }
+
+    private static RuntimeException exception(String msg)
+    {
+        return exception(msg, null);
+    }
+
+    private static RuntimeException exception(String msg, int length)
+    {
+        return exception(msg, null, length);
+    }
+
+    private static RuntimeException exception(Throwable cause)
+    {
+        return exception(null, cause);
+    }
+
+    private static RuntimeException exception(Throwable cause, int length)
+    {
+        return exception(null, cause, length);
+    }
+
+    private static RuntimeException exception(String msg, Throwable cause)
+    {
+        return exception(msg, cause, LIMIT * 2);
+    }
+
+    private static RuntimeException exception(String msg, Throwable cause, int 
length)
+    {
+        RuntimeException e;
+        if (msg != null && cause != null) e = new RuntimeException(msg, cause);
+        else if (msg != null) e = new RuntimeException(msg);
+        else if (cause != null) e = new RuntimeException(cause);
+        else e = new RuntimeException();
+        e.setStackTrace(frames(length));
+        return e;
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionFailedTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionFailedTest.java
new file mode 100644
index 0000000000..b04ea1e057
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionFailedTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class StreamFailureLogsFailureDueToSessionFailedTest extends 
AbstractStreamFailureLogs
+{
+    @Test
+    public void failureDueToSessionFailed() throws IOException
+    {
+        streamTest(true,"Remote peer /127.0.0.2:7012 failed stream session", 
1);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionTimeoutTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionTimeoutTest.java
new file mode 100644
index 0000000000..b473597a5b
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionTimeoutTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.UncheckedTimeoutException;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.Shared;
+import org.awaitility.Awaitility;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamFailureLogsFailureDueToSessionTimeoutTest extends 
AbstractStreamFailureLogs
+{
+    @Test
+    public void failureDueToSessionTimeout() throws IOException
+    {
+        try (Cluster cluster = Cluster.build(2)
+                                      
.withInstanceInitializer(BBStreamTimeoutHelper::install)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        // when die, this will 
try to halt JVM, which is easier to validate in the test
+                                                        // other levels 
require checking state of the subsystems
+                                                        
.set("stream_transfer_task_timeout", "1ms"))
+                                      .start())
+        {
+
+            init(cluster);
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int 
PRIMARY KEY)"));
+
+            ForkJoinPool.commonPool().execute(() -> triggerStreaming(cluster));
+            try
+            {
+                Awaitility.await("Did not see stream running or timed out")
+                          .atMost(3, TimeUnit.MINUTES)
+                          .until(() -> State.STREAM_IS_RUNNING.await(false) || 
searchForLog(cluster.get(1), false, "Session timed out"));
+            }
+            finally
+            {
+                State.UNBLOCK_STREAM.signal();
+            }
+            Awaitility.await("Unable to find 'Session timed out'")
+                      .atMost(1, TimeUnit.MINUTES)
+                      .until(() -> searchForLog(cluster.get(1), false, 
"Session timed out"));
+        }
+    }
+
+    @Shared
+    public static class State
+    {
+        public static final TestCondition STREAM_IS_RUNNING = new 
TestCondition();
+        public static final TestCondition UNBLOCK_STREAM = new TestCondition();
+    }
+
+    @Shared
+    public static class TestCondition
+    {
+        private volatile boolean signaled = false;
+
+        public void await()
+        {
+            await(true);
+        }
+
+        public boolean await(boolean throwOnTimeout)
+        {
+            long deadlineNanos = Clock.Global.nanoTime() + 
TimeUnit.MINUTES.toNanos(1);
+            while (!signaled)
+            {
+                long remainingMillis = 
TimeUnit.NANOSECONDS.toMillis(deadlineNanos - Clock.Global.nanoTime());
+                if (remainingMillis <= 0)
+                {
+                    if (throwOnTimeout) throw new 
UncheckedTimeoutException("Condition not met within 1 minute");
+                    return false;
+                }
+                // await may block signal from triggering notify, so make sure 
not to block for more than 500ms
+                remainingMillis = Math.min(remainingMillis, 500);
+                synchronized (this)
+                {
+                    try
+                    {
+                        this.wait(remainingMillis);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new AssertionError(e);
+                    }
+                }
+            }
+            return true;
+        }
+
+        public void signal()
+        {
+            signaled = true;
+            synchronized (this)
+            {
+                this.notify();
+            }
+        }
+    }
+
+    public static class BBStreamTimeoutHelper
+    {
+        @SuppressWarnings("unused")
+        public static int writeDirectlyToChannel(ByteBuffer buf, @SuperCall 
Callable<Integer> zuper) throws Exception
+        {
+            if (isCaller(SSTableZeroCopyWriter.class.getName(), "write"))
+            {
+                State.STREAM_IS_RUNNING.signal();
+                State.UNBLOCK_STREAM.await();
+            }
+            // different context; pass through
+            return zuper.call();
+        }
+
+        @SuppressWarnings("unused")
+        public static boolean append(UnfilteredRowIterator partition, 
@SuperCall Callable<Boolean> zuper) throws Exception
+        {
+            if (isCaller(CassandraIncomingFile.class.getName(), "read")) // 
handles compressed and non-compressed
+            {
+                State.STREAM_IS_RUNNING.signal();
+                State.UNBLOCK_STREAM.await();
+            }
+            // different context; pass through
+            return zuper.call();
+        }
+
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            if (num != FAILING_NODE)
+                return;
+            new ByteBuddy().rebase(SequentialWriter.class)
+                           
.method(named("writeDirectlyToChannel").and(takesArguments(1)))
+                           
.intercept(MethodDelegation.to(BBStreamTimeoutHelper.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+
+            new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+                           .method(named("append").and(takesArguments(1)))
+                           
.intercept(MethodDelegation.to(BBStreamTimeoutHelper.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+        }
+
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithEOFTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithEOFTest.java
new file mode 100644
index 0000000000..9b0f4f4dfd
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithEOFTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class StreamFailureLogsFailureInTheMiddleWithEOFTest extends 
AbstractStreamFailureLogs
+{
+    @Test
+    public void failureInTheMiddleWithEOF() throws IOException
+    {
+        streamTest(false, "Session peer /127.0.0.1:7012 Failed because there 
was an java.nio.channels.ClosedChannelException with state=STREAMING", 
FAILING_NODE);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithUnknownTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithUnknownTest.java
new file mode 100644
index 0000000000..9d2507bbf6
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithUnknownTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class StreamFailureLogsFailureInTheMiddleWithUnknownTest extends 
AbstractStreamFailureLogs
+{
+    @Test
+    public void failureInTheMiddleWithUnknown() throws IOException
+    {
+        streamTest(true, "java.lang.RuntimeException: TEST", FAILING_NODE);
+    }
+}
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
 
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index b5bb40c3c1..64389221a1 100644
--- 
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -221,7 +221,7 @@ public class ZeroCopyStreamingBenchmark
             StreamResultFuture future = 
StreamResultFuture.createInitiator(nextTimeUUID(), StreamOperation.BOOTSTRAP, 
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
 
             InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
-            streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED));
+            streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED, null));
 
             StreamSession session = 
streamCoordinator.getOrCreateOutboundSession(peer);
             session.init(future);
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index a65b2ea3a0..d0a81028cc 100644
--- 
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -209,7 +209,7 @@ public class CassandraEntireSSTableStreamWriterTest
         StreamResultFuture future = 
StreamResultFuture.createInitiator(nextTimeUUID(), StreamOperation.BOOTSTRAP, 
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
 
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
-        streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED));
+        streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED, null));
 
         StreamSession session = 
streamCoordinator.getOrCreateOutboundSession(peer);
         session.init(future);
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
 
b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
index 5733b2fa5a..8107fd9789 100644
--- 
a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
+++ 
b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
@@ -324,7 +324,7 @@ public class 
EntireSSTableStreamConcurrentComponentMutationTest
         StreamResultFuture future = 
StreamResultFuture.createInitiator(nextTimeUUID(), StreamOperation.BOOTSTRAP, 
Collections.emptyList(), streamCoordinator);
 
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
-        streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED));
+        streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED, null));
 
         StreamSession session = 
streamCoordinator.getOrCreateOutboundSession(peer);
         session.init(future);
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
index c8e3d89f6b..56d01935ed 100644
--- a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
@@ -96,9 +96,9 @@ public class StreamingVirtualTableTest extends CQLTester
         assertRows(execute(t("select id, follower, operation, peers, status, 
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
                    new Object[] { state.id(), true, "Repair", 
Collections.emptyList(), "start", 0F, new Date(state.lastUpdatedAtMillis()), 
null, null });
 
-        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.PREPARING), StreamSession.PrepareDirection.ACK));
+        state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.PREPARING, null), StreamSession.PrepareDirection.ACK));
 
-        state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR, 
ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(), 
Collections.emptyList(), StreamSession.State.COMPLETE))));
+        state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR, 
ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(), 
Collections.emptyList(), StreamSession.State.COMPLETE, null))));
         assertRows(execute(t("select id, follower, operation, peers, status, 
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
                    new Object[] { state.id(), true, "Repair", 
Arrays.asList(address(127, 0, 0, 2).toString()), "success", 100F, new 
Date(state.lastUpdatedAtMillis()), null, null });
     }
@@ -121,8 +121,8 @@ public class StreamingVirtualTableTest extends CQLTester
         StreamResultFuture future = state.future();
         state.phase.start();
 
-        SessionInfo s1 = new SessionInfo(PEER2, 0, 
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), 
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
-        SessionInfo s2 = new SessionInfo(PEER3, 0, 
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), 
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
+        SessionInfo s1 = new SessionInfo(PEER2, 0, 
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), 
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING, 
null);
+        SessionInfo s2 = new SessionInfo(PEER3, 0, 
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), 
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING, 
null);
 
         // we only update stats on ACK
         state.handleStreamEvent(new 
StreamEvent.SessionPreparedEvent(state.id(), s1, 
StreamSession.PrepareDirection.ACK));
diff --git 
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
 
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
index 34ce09c3f6..5a8f4a58cf 100644
--- 
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -212,7 +212,8 @@ public class EntireSSTableStreamingCorrectFilesCountTest
                                                          peer,
                                                          
Collections.emptyList(),
                                                          
Collections.emptyList(),
-                                                         
StreamSession.State.INITIALIZED));
+                                                         
StreamSession.State.INITIALIZED,
+                                                         null));
 
         StreamSession session = 
streamCoordinator.getOrCreateOutboundSession(peer);
         session.init(future);
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java 
b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 45172fe14c..778b6b2d71 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -46,7 +46,7 @@ public class SessionInfoTest
         }
 
         StreamSummary sending = new StreamSummary(tableId, 10, 100);
-        SessionInfo info = new SessionInfo(local, 0, local, summaries, 
Collections.singleton(sending), StreamSession.State.PREPARING);
+        SessionInfo info = new SessionInfo(local, 0, local, summaries, 
Collections.singleton(sending), StreamSession.State.PREPARING, null);
 
         assert info.getTotalFilesToReceive() == 45;
         assert info.getTotalFilesToSend() == 10;
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/NetStatsTest.java 
b/test/unit/org/apache/cassandra/tools/nodetool/NetStatsTest.java
index f8bd530539..7bddc9b23a 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/NetStatsTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/NetStatsTest.java
@@ -117,7 +117,8 @@ public class NetStatsTest extends CQLTester
                                            InetAddressAndPort.getLocalHost(),
                                            streamSummaries,
                                            streamSummaries,
-                                           State.COMPLETE);
+                                           State.COMPLETE,
+                                           null);
 
         try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
PrintStream out = new PrintStream(baos))
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to