This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4444721b6d Provide summary of failed SessionInfo's in
StreamResultFuture
4444721b6d is described below
commit 4444721b6de555352bf0ac3ef7e36f94dc832f41
Author: Natnael Adere <[email protected]>
AuthorDate: Fri Mar 24 09:28:37 2023 -0700
Provide summary of failed SessionInfo's in StreamResultFuture
patch by Natnael Adere; reviewed by David Capwell, Dinesh Joshi, Jon
Meredith for CASSANDRA-17199
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 5 +
.../apache/cassandra/streaming/SessionInfo.java | 13 +-
.../apache/cassandra/streaming/StreamManager.java | 24 ++-
.../cassandra/streaming/StreamResultFuture.java | 12 +-
.../apache/cassandra/streaming/StreamSession.java | 80 ++++++---
.../apache/cassandra/streaming/StreamState.java | 4 +
.../apache/cassandra/streaming/StreamingState.java | 20 ++-
.../management/SessionInfoCompositeData.java | 2 +-
.../org/apache/cassandra/utils/ObjectSizes.java | 21 +++
src/java/org/apache/cassandra/utils/TimeUUID.java | 1 +
.../test/streaming/AbstractStreamFailureLogs.java | 175 ++++++++++++++++++++
.../test/streaming/BoundExceptionTest.java | 165 +++++++++++++++++++
...amFailureLogsFailureDueToSessionFailedTest.java | 32 ++++
...mFailureLogsFailureDueToSessionTimeoutTest.java | 181 +++++++++++++++++++++
...amFailureLogsFailureInTheMiddleWithEOFTest.java | 32 ++++
...ilureLogsFailureInTheMiddleWithUnknownTest.java | 32 ++++
.../microbench/ZeroCopyStreamingBenchmark.java | 2 +-
.../CassandraEntireSSTableStreamWriterTest.java | 2 +-
...TableStreamConcurrentComponentMutationTest.java | 2 +-
.../db/virtual/StreamingVirtualTableTest.java | 8 +-
...ntireSSTableStreamingCorrectFilesCountTest.java | 3 +-
.../cassandra/streaming/SessionInfoTest.java | 2 +-
.../cassandra/tools/nodetool/NetStatsTest.java | 3 +-
25 files changed, 780 insertions(+), 44 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index edacdfc568..01918fe861 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0
+ * Provide summary of failed SessionInfo's in StreamResultFuture
(CASSANDRA-17199)
* CEP-20: Dynamic Data Masking (CASSANDRA-17940)
* Add system_views.snapshots virtual table (CASSANDRA-18102)
* Update OpenHFT dependencies (chronicle-queue, chronicle-core,
chronicle-bytes, chronicle-wire, chronicle-threads) (CASSANDRA-18049)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 91cae27b20..839e4c2c7e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -165,6 +165,8 @@ public class Config
@Replaces(oldName = "slow_query_log_timeout_in_ms", converter =
Converters.MILLIS_DURATION_LONG, deprecated = true)
public volatile DurationSpec.LongMillisecondsBound slow_query_log_timeout
= new DurationSpec.LongMillisecondsBound("500ms");
+ public volatile DurationSpec.LongMillisecondsBound
stream_transfer_task_timeout = new DurationSpec.LongMillisecondsBound("12h");
+
public volatile double phi_convict_threshold = 8.0;
public int concurrent_reads = 32;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 02b0038e89..33b229e106 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3510,6 +3510,11 @@ public class DatabaseDescriptor
return conf.stream_entire_sstables;
}
+ public static DurationSpec.LongMillisecondsBound
getStreamTransferTaskTimeout()
+ {
+ return conf.stream_transfer_task_timeout;
+ }
+
public static boolean getSkipStreamDiskSpaceCheck()
{
return conf.skip_stream_disk_space_check;
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java
b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index d95d85bbe5..92af245988 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -47,12 +47,15 @@ public final class SessionInfo implements Serializable
private final Map<String, ProgressInfo> receivingFiles = new
ConcurrentHashMap<>();
private final Map<String, ProgressInfo> sendingFiles = new
ConcurrentHashMap<>();
+ public final String failureReason;
+
public SessionInfo(InetSocketAddress peer,
int sessionIndex,
InetSocketAddress connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
- StreamSession.State state)
+ StreamSession.State state,
+ String failureReason)
{
this.peer = peer;
this.sessionIndex = sessionIndex;
@@ -60,11 +63,12 @@ public final class SessionInfo implements Serializable
this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
this.state = state;
+ this.failureReason = failureReason;
}
public SessionInfo(SessionInfo other)
{
- this(other.peer, other.sessionIndex, other.connecting,
other.receivingSummaries, other.sendingSummaries, other.state);
+ this(other.peer, other.sessionIndex, other.connecting,
other.receivingSummaries, other.sendingSummaries, other.state,
other.failureReason);
}
public boolean isFailed()
@@ -205,4 +209,9 @@ public final class SessionInfo implements Serializable
{
return new SessionSummary(FBUtilities.getBroadcastAddressAndPort(),
peer, receivingSummaries, sendingSummaries);
}
+
+ public String getFailureReason()
+ {
+ return failureReason;
+ }
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index e49cc4d53c..fec8b2de21 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -31,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.Weigher;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
@@ -252,18 +253,35 @@ public class StreamManager implements StreamManagerMBean
}
};
+ protected void addStreamingStateAgain(StreamingState state)
+ {
+ if (!DatabaseDescriptor.getStreamingStatsEnabled())
+ return;
+ states.put(state.id(), state);
+ }
+
public StreamManager()
{
DurationSpec.LongNanosecondsBound duration =
DatabaseDescriptor.getStreamingStateExpires();
long sizeBytes = DatabaseDescriptor.getStreamingStateSize().toBytes();
- long numElements = sizeBytes / StreamingState.ELEMENT_SIZE;
- logger.info("Storing streaming state for {} or for {} elements",
duration, numElements);
+ logger.info("Storing streaming state for {} or for size {}", duration,
sizeBytes);
states = CacheBuilder.newBuilder()
.expireAfterWrite(duration.quantity(),
duration.unit())
- .maximumSize(numElements)
+ .maximumWeight(sizeBytes)
+ .weigher(new StreamingStateWeigher())
.build();
}
+ private static class StreamingStateWeigher implements
Weigher<TimeUUID,StreamingState>
+ {
+ @Override
+ public int weigh(TimeUUID key, StreamingState val)
+ {
+ long costOfStreamingState = val.unsharedHeapSize() +
TimeUUID.TIMEUUID_SIZE;
+ return Math.toIntExact(costOfStreamingState);
+ }
+ }
+
public void start()
{
addListener(listener);
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index b43203dee3..5277b9de73 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -239,8 +239,16 @@ public final class StreamResultFuture extends
AsyncFuture<StreamState>
StreamState finalState = getCurrentState();
if (finalState.hasFailedSession())
{
- logger.warn("[Stream #{}] Stream failed", planId);
- tryFailure(new StreamException(finalState, "Stream failed"));
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("Stream failed: ");
+ for (SessionInfo info : finalState.sessions())
+ {
+ if (info.isFailed())
+ stringBuilder.append("\nSession peer
").append(info.peer).append(' ').append(info.failureReason);
+ }
+ String message = stringBuilder.toString();
+ logger.warn("[Stream #{}] {}", planId, message);
+ tryFailure(new StreamException(finalState, message));
}
else if (finalState.hasAbortedSession())
{
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 3917e1a5fd..a0e9b2d079 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -58,7 +59,6 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionStrategyManager;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
@@ -154,9 +154,10 @@ import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
* (via {@link org.apache.cassandra.net.MessagingService}, while the actual
files themselves are sent by a special
* "streaming" connection type. See {@link StreamingMultiplexedChannel} for
details. Because of the asynchronous
*/
-public class StreamSession implements IEndpointStateChangeSubscriber
+public class StreamSession
{
private static final Logger logger =
LoggerFactory.getLogger(StreamSession.class);
+ private static final int DEBUG_STACKTRACE_LIMIT =
Integer.parseInt(System.getProperty("cassandra.streaming.debug_stacktrace_limit",
"2"));
public enum PrepareDirection { SEND, ACK }
@@ -203,6 +204,8 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
private final TimeUUID pendingRepair;
private final PreviewKind previewKind;
+ public String failureReason;
+
/**
* State Transition:
*
@@ -514,13 +517,20 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
}
}
- private synchronized Future<?> closeSession(State finalState)
+ private Future<?> closeSession(State finalState)
+ {
+ return closeSession(finalState, null);
+ }
+
+ private synchronized Future<?> closeSession(State finalState, String
failureReason)
{
// it's session is already closed
if (closeFuture != null)
return closeFuture;
state(finalState);
+ //this refers to StreamInfo
+ this.failureReason = failureReason;
List<Future<?>> futures = new ArrayList<>();
@@ -673,7 +683,6 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
if (state.finalState)
{
logger.debug("[Stream #{}] Socket closed after session
completed with state {}", planId(), state);
-
return null;
}
else
@@ -682,8 +691,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
planId(),
peer.getHostAddressAndPort(),
e);
-
- return closeSession(State.FAILED);
+ return closeSession(State.FAILED, "Failed because there was an
" + e.getClass().getCanonicalName() + " with state=" + state.name());
}
}
@@ -694,8 +702,9 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
state(State.FAILED); // make sure subsequent error handling sees
the session in a final state
channel.sendControlMessage(new
SessionFailedMessage()).awaitUninterruptibly();
}
-
- return closeSession(State.FAILED);
+ StringBuilder failureReason = new StringBuilder("Failed because of an
unknown exception\n");
+ boundStackTrace(e, DEBUG_STACKTRACE_LIMIT, failureReason);
+ return closeSession(State.FAILED, failureReason.toString());
}
private void logError(Throwable e)
@@ -981,7 +990,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
StreamTransferTask task = transfers.get(message.header.tableId);
if (task != null)
{
- task.scheduleTimeout(message.header.sequenceNumber, 12,
TimeUnit.HOURS);
+ task.scheduleTimeout(message.header.sequenceNumber,
DatabaseDescriptor.getStreamTransferTaskTimeout().toMilliseconds(),
TimeUnit.MILLISECONDS);
}
}
@@ -1107,7 +1116,9 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
public synchronized void sessionFailed()
{
logger.error("[Stream #{}] Remote peer {} failed stream session.",
planId(), peer.toString());
- closeSession(State.FAILED);
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("Remote peer ").append(peer).append(" failed
stream session");
+ closeSession(State.FAILED, stringBuilder.toString());
}
/**
@@ -1116,7 +1127,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
public synchronized void sessionTimeout()
{
logger.error("[Stream #{}] timeout with {}.", planId(),
peer.toString());
- closeSession(State.FAILED);
+ closeSession(State.FAILED, "Session timed out");
}
/**
@@ -1130,7 +1141,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
List<StreamSummary> transferSummaries = Lists.newArrayList();
for (StreamTask transfer : transfers.values())
transferSummaries.add(transfer.getSummary());
- return new SessionInfo(channel.peer(), index, channel.connectedTo(),
receivingSummaries, transferSummaries, state);
+ return new SessionInfo(channel.peer(), index, channel.connectedTo(),
receivingSummaries, transferSummaries, state, failureReason);
}
public synchronized void taskCompleted(StreamReceiveTask completedTask)
@@ -1145,18 +1156,6 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
maybeCompleted();
}
- public void onRemove(InetAddressAndPort endpoint)
- {
- logger.error("[Stream #{}] Session failed because remote peer {} has
left.", planId(), peer.toString());
- closeSession(State.FAILED);
- }
-
- public void onRestart(InetAddressAndPort endpoint, EndpointState epState)
- {
- logger.error("[Stream #{}] Session failed because remote peer {} was
restarted.", planId(), peer.toString());
- closeSession(State.FAILED);
- }
-
private void completePreview()
{
try
@@ -1340,4 +1339,37 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
", state=" + state +
'}';
}
+
+ public static StringBuilder boundStackTrace(Throwable e, int limit,
StringBuilder out)
+ {
+ Set<Throwable> visited = Collections.newSetFromMap(new
IdentityHashMap<>());
+ return boundStackTrace(e, limit, limit, visited, out);
+ }
+
+ public static StringBuilder boundStackTrace(Throwable e, int limit, int
counter, Set<Throwable> visited, StringBuilder out)
+ {
+ if (e == null)
+ return out;
+
+ if (!visited.add(e))
+ return out.append("[CIRCULAR REFERENCE:
").append(e.getClass().getName()).append(":
").append(e.getMessage()).append("]").append('\n');
+ visited.add(e);
+
+ StackTraceElement[] stackTrace = e.getStackTrace();
+ out.append(e.getClass().getName() + ": " +
e.getMessage()).append('\n');
+
+ // When dealing with the leaf, ignore how many stack traces were
already written, and allow the max.
+ // This is here as the leaf tends to show where the issue started, so
tends to be impactful for debugging
+ if (e.getCause() == null)
+ counter = limit;
+
+ for (int i = 0, size = Math.min(e.getStackTrace().length, limit); i <
size && counter > 0; i++)
+ {
+ out.append('\t').append(stackTrace[i]).append('\n');
+ counter--;
+ }
+
+ boundStackTrace(e.getCause(), limit, counter, visited, out);
+ return out;
+ }
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamState.java
b/src/java/org/apache/cassandra/streaming/StreamState.java
index 88eb76df43..69ba6988ed 100644
--- a/src/java/org/apache/cassandra/streaming/StreamState.java
+++ b/src/java/org/apache/cassandra/streaming/StreamState.java
@@ -56,4 +56,8 @@ public class StreamState implements Serializable
{
return Lists.newArrayList(Iterables.transform(sessions,
SessionInfo::createSummary));
}
+
+ public Set<SessionInfo> sessions() {
+ return sessions;
+ }
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamingState.java
b/src/java/org/apache/cassandra/streaming/StreamingState.java
index c2eed1ea9e..aaeceb2fcf 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingState.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingState.java
@@ -34,6 +34,7 @@ import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.virtual.SimpleDataSet;
import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
import org.apache.cassandra.utils.Clock;
@@ -43,11 +44,11 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
-public class StreamingState implements StreamEventHandler
+public class StreamingState implements StreamEventHandler, IMeasurableMemory
{
private static final Logger logger =
LoggerFactory.getLogger(StreamingState.class);
- public static final long ELEMENT_SIZE = ObjectSizes.measureDeep(new
StreamingState(nextTimeUUID(), StreamOperation.OTHER, false));
+ public static final long EMPTY = ObjectSizes.measureDeep(new
StreamingState(nextTimeUUID(), StreamOperation.OTHER, false));
public enum Status
{INIT, START, SUCCESS, FAILURE}
@@ -70,6 +71,14 @@ public class StreamingState implements StreamEventHandler
// API for state changes
public final Phase phase = new Phase();
+ @Override
+ public long unsharedHeapSize()
+ {
+ long costOfPeers = peers().size() *
(ObjectSizes.IPV6_SOCKET_ADDRESS_SIZE + 48); // 48 represents the datastructure
cost computed by the JOL
+ long costOfCompleteMessage = ObjectSizes.sizeOf(completeMessage());
+ return costOfPeers + costOfCompleteMessage + EMPTY;
+ }
+
public StreamingState(StreamResultFuture result)
{
this(result.planId, result.streamOperation,
result.getCoordinator().isFollower());
@@ -104,6 +113,11 @@ public class StreamingState implements StreamEventHandler
return this.peers;
}
+ public String completeMessage()
+ {
+ return this.completeMessage;
+ }
+
public Status status()
{
return status;
@@ -283,6 +297,8 @@ public class StreamingState implements StreamEventHandler
{
completeMessage = Throwables.getStackTraceAsString(throwable);
updateState(Status.FAILURE);
+ //we know the size is now very different from the estimate so
recompute by adding again
+ StreamManager.instance.addStreamingStateAgain(this);
}
private synchronized void updateState(Status state)
diff --git
a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index d7a78e39f6..eaa37bbca8 100644
---
a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++
b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -151,7 +151,7 @@ public class SessionInfoCompositeData
connecting,
fromArrayOfCompositeData((CompositeData[]) values[5], toStreamSummary),
fromArrayOfCompositeData((CompositeData[]) values[6], toStreamSummary),
-
StreamSession.State.valueOf((String) values[7]));
+
StreamSession.State.valueOf((String) values[7]), null); // null is here to
maintain backwards compatibility
Function<CompositeData, ProgressInfo> toProgressInfo = new
Function<CompositeData, ProgressInfo>()
{
public ProgressInfo apply(CompositeData input)
diff --git a/src/java/org/apache/cassandra/utils/ObjectSizes.java
b/src/java/org/apache/cassandra/utils/ObjectSizes.java
index 9e1b407f1d..2d94983d18 100644
--- a/src/java/org/apache/cassandra/utils/ObjectSizes.java
+++ b/src/java/org/apache/cassandra/utils/ObjectSizes.java
@@ -19,6 +19,9 @@
package org.apache.cassandra.utils;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import org.github.jamm.MemoryLayoutSpecification;
@@ -39,6 +42,8 @@ public class ObjectSizes
private static final long DIRECT_BUFFER_HEAP_SIZE =
measure(ByteBuffer.allocateDirect(0));
+ public static final long IPV6_SOCKET_ADDRESS_SIZE =
ObjectSizes.measureDeep(new InetSocketAddress(getIpvAddress(16), 42));
+
/**
* Memory a byte array consumes
*
@@ -236,4 +241,20 @@ public class ObjectSizes
{
return meter.measure(pojo);
}
+
+ private static InetAddress getIpvAddress(int size)
+ {
+ if (size == 16 || size ==4)
+ {
+ try
+ {
+ return InetAddress.getByAddress(new byte[size]);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new IllegalArgumentException("Invalid size of a byte
array when getting and ipv address: " + size);
+ }
+ }
+ else throw new IllegalArgumentException("Excpected a byte array size
of 4 or 16 for an ipv address but got: " + size);
+ }
}
diff --git a/src/java/org/apache/cassandra/utils/TimeUUID.java
b/src/java/org/apache/cassandra/utils/TimeUUID.java
index 8d79096ecd..b944d843d4 100644
--- a/src/java/org/apache/cassandra/utils/TimeUUID.java
+++ b/src/java/org/apache/cassandra/utils/TimeUUID.java
@@ -82,6 +82,7 @@ public class TimeUUID implements Serializable,
Comparable<TimeUUID>
private static final long MIN_CLOCK_SEQ_AND_NODE = 0x8080808080808080L;
private static final long MAX_CLOCK_SEQ_AND_NODE = 0x7f7f7f7f7f7f7f7fL;
+ public static final long TIMEUUID_SIZE = ObjectSizes.measureDeep(new
TimeUUID(10, 10));
final long uuidTimestamp, lsb;
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/AbstractStreamFailureLogs.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/AbstractStreamFailureLogs.java
new file mode 100644
index 0000000000..28aa6b0bcc
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/AbstractStreamFailureLogs.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.assertj.core.api.Assertions;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class AbstractStreamFailureLogs extends TestBaseImpl
+{
+ protected static final Logger logger =
LoggerFactory.getLogger(AbstractStreamFailureLogs.class);
+
+ protected static final int FAILING_NODE = 2;
+
+ protected void streamTest(boolean zeroCopyStreaming, String reason,
Integer failedNode) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withInstanceInitializer(BBStreamHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+
.set("stream_entire_sstables", zeroCopyStreaming)
+ // when die, this will
try to halt JVM, which is easier to validate in the test
+ // other levels
require checking state of the subsystems
+
.set("disk_failure_policy", "die"))
+ .start())
+ {
+ init(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY)"));
+
+ triggerStreaming(cluster);
+ // make sure disk failure policy is not triggered
+
+ IInvokableInstance failingNode = cluster.get(failedNode);
+
+ searchForLog(failingNode, reason);
+ }
+ }
+
+ protected void triggerStreaming(Cluster cluster)
+ {
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+
+ // repair will do streaming IFF there is a mismatch; so cause one
+ for (int i = 0; i < 10; i++)
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES
(?)"), i); // timestamp won't match, causing a mismatch
+
+ // trigger streaming; expected to fail as streaming socket closed in
the middle (currently this is an unrecoverable event)
+ //Blocks until the stream is complete
+ node2.nodetoolResult("repair", "-full", KEYSPACE,
"tbl").asserts().failure();
+ }
+
+ protected void searchForLog(IInvokableInstance failingNode, String reason)
+ {
+ searchForLog(failingNode, true, reason);
+ }
+
+ protected boolean searchForLog(IInvokableInstance failingNode, boolean
failIfNoMatch, String reason)
+ {
+ LogResult<List<String>> result = failingNode.logs().grepForErrors(-1,
Pattern.compile("Stream failed:"));
+ // grepForErrors will include all ERROR logs even if they don't match
the pattern; for this reason need to filter after the fact
+ List<String> matches = result.getResult();
+
+ matches = matches.stream().filter(s ->
s.startsWith("WARN")).collect(Collectors.toList());
+ logger.info("Stream failed logs found: {}", String.join("\n",
matches));
+ if (matches.isEmpty() && !failIfNoMatch)
+ return false;
+
+ Assertions.assertThat(matches)
+ .describedAs("node%d expected to find %s but could not",
failingNode.config().num(), reason)
+ .hasSize(1);
+ String logLine = matches.get(0);
+ Assertions.assertThat(logLine).contains(reason);
+
+ Matcher match = Pattern.compile(".*\\[Stream
#(.*)\\]").matcher(logLine);
+ if (!match.find()) throw new AssertionError("Unable to parse: " +
logLine);
+ UUID planId = UUID.fromString(match.group(1));
+ SimpleQueryResult qr = failingNode.executeInternalWithResult("SELECT *
FROM system_views.streaming WHERE id=?", planId);
+ Assertions.assertThat(qr.hasNext()).isTrue();
+
Assertions.assertThat(qr.next().getString("failure_cause")).contains(reason);
+ return true;
+ }
+
+ public static class BBStreamHelper
+ {
+ @SuppressWarnings("unused")
+ public static int writeDirectlyToChannel(ByteBuffer buf, @SuperCall
Callable<Integer> zuper) throws Exception
+ {
+ if (isCaller(SSTableZeroCopyWriter.class.getName(), "write"))
+ throw new RuntimeException("TEST");
+ // different context; pass through
+ return zuper.call();
+ }
+
+ @SuppressWarnings("unused")
+ public static boolean append(UnfilteredRowIterator partition,
@SuperCall Callable<Boolean> zuper) throws Exception
+ {
+ if (isCaller(CassandraIncomingFile.class.getName(), "read")) //
handles compressed and non-compressed
+ throw new java.nio.channels.ClosedChannelException();
+ // different context; pass through
+ return zuper.call();
+ }
+
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ if (num != FAILING_NODE)
+ return;
+ new ByteBuddy().rebase(SequentialWriter.class)
+
.method(named("writeDirectlyToChannel").and(takesArguments(1)))
+
.intercept(MethodDelegation.to(BBStreamHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+
+ new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+ .method(named("append").and(takesArguments(1)))
+
.intercept(MethodDelegation.to(BBStreamHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+
+ }
+ }
+
+ protected static boolean isCaller(String klass, String method)
+ {
+ StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ for (int i = 0; i < stack.length; i++)
+ {
+ StackTraceElement e = stack[i];
+ if (klass.equals(e.getClassName()) &&
method.equals(e.getMethodName()))
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/BoundExceptionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/BoundExceptionTest.java
new file mode 100644
index 0000000000..23c445c99f
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/BoundExceptionTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import org.junit.Test;
+
+import org.apache.cassandra.streaming.StreamSession;
+
+import static org.junit.Assert.assertEquals;
+
+public class BoundExceptionTest
+{
+ private static final int LIMIT = 2;
+
+ @Test
+ public void testSingleException()
+ {
+ Throwable exceptionToTest = exception("test exception");
+ StringBuilder boundedStackTrace =
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+
+ String expectedStackTrace = "java.lang.RuntimeException: test
exception\n" +
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
+
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n";
+
+ assertEquals(expectedStackTrace,boundedStackTrace.toString());
+ }
+
+ @Test
+ public void testNestedException()
+ {
+ Throwable exceptionToTest = exception(exception("the disk /foo/var is
bad", exception("Bad disk somewhere")));
+ StringBuilder boundedStackTrace =
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+
+ String expectedStackTrace = "java.lang.RuntimeException:
java.lang.RuntimeException: the disk /foo/var is bad\n" +
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
+
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n"
+
+ "java.lang.RuntimeException: the disk
/foo/var is bad\n" +
+ "java.lang.RuntimeException: Bad disk
somewhere\n" +
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
+
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n";
+
+ assertEquals(expectedStackTrace, boundedStackTrace.toString());
+ }
+
+ @Test
+ public void testExceptionCycle()
+ {
+ Exception e1 = exception("Test exception 1");
+ Exception e2 = exception("Test exception 2");
+
+ e1.initCause(e2);
+ e2.initCause(e1);
+
+ StringBuilder boundedStackTrace = StreamSession.boundStackTrace(e1,
LIMIT, new StringBuilder());
+ String expectedStackTrace = "java.lang.RuntimeException: Test
exception 1\n" +
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
+
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n"
+
+ "java.lang.RuntimeException: Test
exception 2\n" +
+ "[CIRCULAR REFERENCE:
java.lang.RuntimeException: Test exception 1]\n";
+
+ assertEquals(expectedStackTrace, boundedStackTrace.toString());
+ }
+
+ @Test
+ public void testEmptyStackTrace()
+ {
+ Throwable exceptionToTest = exception("there are words here", 0);
+
+ StringBuilder boundedStackTrace =
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+ String expectedStackTrace = "java.lang.RuntimeException: there are
words here\n";
+
+ assertEquals(expectedStackTrace,boundedStackTrace.toString());
+ }
+
+ @Test
+ public void testEmptyNestedStackTrace()
+ {
+ Throwable exceptionToTest = exception(exception("the disk /foo/var is
bad", exception("Bad disk somewhere"), 0), 0);
+
+ StringBuilder boundedStackTrace =
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+ String expectedStackTrace = "java.lang.RuntimeException:
java.lang.RuntimeException: the disk /foo/var is bad\n" +
+ "java.lang.RuntimeException: the disk
/foo/var is bad\n" +
+ "java.lang.RuntimeException: Bad disk
somewhere\n" +
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
+
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n";
+
+ assertEquals(expectedStackTrace, boundedStackTrace.toString());
+ }
+
+ @Test
+ public void testLimitLargerThanStackTrace()
+ {
+ Throwable exceptionToTest = exception(exception("the disk /foo/var is
bad", exception("Bad disk somewhere")), 1);
+
+ StringBuilder boundedStackTrace =
StreamSession.boundStackTrace(exceptionToTest, LIMIT, new StringBuilder());
+ String expectedStackTrace = "java.lang.RuntimeException:
java.lang.RuntimeException: the disk /foo/var is bad\n" +
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
+
+ "java.lang.RuntimeException: the disk
/foo/var is bad\n" +
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
+
+ "java.lang.RuntimeException: Bad disk
somewhere\n" +
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:0)\n"
+
+
"\torg.apache.cassandra.distributed.test.streaming.BoundExceptionTest.method(BoundExceptionTest.java:1)\n";
+
+ assertEquals(expectedStackTrace, boundedStackTrace.toString());
+ }
+
+ private static StackTraceElement[] frames(int length)
+ {
+ StackTraceElement[] frames = new StackTraceElement[length];
+ for (int i = 0; i < length; i++)
+ frames[i] = new
StackTraceElement(BoundExceptionTest.class.getCanonicalName(), "method",
BoundExceptionTest.class.getSimpleName() + ".java", i);
+ return frames;
+ }
+
+ private static RuntimeException exception(String msg)
+ {
+ return exception(msg, null);
+ }
+
+ private static RuntimeException exception(String msg, int length)
+ {
+ return exception(msg, null, length);
+ }
+
+ private static RuntimeException exception(Throwable cause)
+ {
+ return exception(null, cause);
+ }
+
+ private static RuntimeException exception(Throwable cause, int length)
+ {
+ return exception(null, cause, length);
+ }
+
+ private static RuntimeException exception(String msg, Throwable cause)
+ {
+ return exception(msg, cause, LIMIT * 2);
+ }
+
+ private static RuntimeException exception(String msg, Throwable cause, int
length)
+ {
+ RuntimeException e;
+ if (msg != null && cause != null) e = new RuntimeException(msg, cause);
+ else if (msg != null) e = new RuntimeException(msg);
+ else if (cause != null) e = new RuntimeException(cause);
+ else e = new RuntimeException();
+ e.setStackTrace(frames(length));
+ return e;
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionFailedTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionFailedTest.java
new file mode 100644
index 0000000000..b04ea1e057
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionFailedTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class StreamFailureLogsFailureDueToSessionFailedTest extends
AbstractStreamFailureLogs
+{
+ @Test
+ public void failureDueToSessionFailed() throws IOException
+ {
+ streamTest(true,"Remote peer /127.0.0.2:7012 failed stream session",
1);
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionTimeoutTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionTimeoutTest.java
new file mode 100644
index 0000000000..b473597a5b
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureDueToSessionTimeoutTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.UncheckedTimeoutException;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.Shared;
+import org.awaitility.Awaitility;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamFailureLogsFailureDueToSessionTimeoutTest extends
AbstractStreamFailureLogs
+{
+ @Test
+ public void failureDueToSessionTimeout() throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withInstanceInitializer(BBStreamTimeoutHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+ // when die, this will
try to halt JVM, which is easier to validate in the test
+ // other levels
require checking state of the subsystems
+
.set("stream_transfer_task_timeout", "1ms"))
+ .start())
+ {
+
+ init(cluster);
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY)"));
+
+ ForkJoinPool.commonPool().execute(() -> triggerStreaming(cluster));
+ try
+ {
+ Awaitility.await("Did not see stream running or timed out")
+ .atMost(3, TimeUnit.MINUTES)
+ .until(() -> State.STREAM_IS_RUNNING.await(false) ||
searchForLog(cluster.get(1), false, "Session timed out"));
+ }
+ finally
+ {
+ State.UNBLOCK_STREAM.signal();
+ }
+ Awaitility.await("Unable to find 'Session timed out'")
+ .atMost(1, TimeUnit.MINUTES)
+ .until(() -> searchForLog(cluster.get(1), false,
"Session timed out"));
+ }
+ }
+
+ @Shared
+ public static class State
+ {
+ public static final TestCondition STREAM_IS_RUNNING = new
TestCondition();
+ public static final TestCondition UNBLOCK_STREAM = new TestCondition();
+ }
+
+ @Shared
+ public static class TestCondition
+ {
+ private volatile boolean signaled = false;
+
+ public void await()
+ {
+ await(true);
+ }
+
+ public boolean await(boolean throwOnTimeout)
+ {
+ long deadlineNanos = Clock.Global.nanoTime() +
TimeUnit.MINUTES.toNanos(1);
+ while (!signaled)
+ {
+ long remainingMillis =
TimeUnit.NANOSECONDS.toMillis(deadlineNanos - Clock.Global.nanoTime());
+ if (remainingMillis <= 0)
+ {
+ if (throwOnTimeout) throw new
UncheckedTimeoutException("Condition not met within 1 minute");
+ return false;
+ }
+ // await may block signal from triggering notify, so make sure
not to block for more than 500ms
+ remainingMillis = Math.min(remainingMillis, 500);
+ synchronized (this)
+ {
+ try
+ {
+ this.wait(remainingMillis);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ }
+ return true;
+ }
+
+ public void signal()
+ {
+ signaled = true;
+ synchronized (this)
+ {
+ this.notify();
+ }
+ }
+ }
+
+ public static class BBStreamTimeoutHelper
+ {
+ @SuppressWarnings("unused")
+ public static int writeDirectlyToChannel(ByteBuffer buf, @SuperCall
Callable<Integer> zuper) throws Exception
+ {
+ if (isCaller(SSTableZeroCopyWriter.class.getName(), "write"))
+ {
+ State.STREAM_IS_RUNNING.signal();
+ State.UNBLOCK_STREAM.await();
+ }
+ // different context; pass through
+ return zuper.call();
+ }
+
+ @SuppressWarnings("unused")
+ public static boolean append(UnfilteredRowIterator partition,
@SuperCall Callable<Boolean> zuper) throws Exception
+ {
+ if (isCaller(CassandraIncomingFile.class.getName(), "read")) //
handles compressed and non-compressed
+ {
+ State.STREAM_IS_RUNNING.signal();
+ State.UNBLOCK_STREAM.await();
+ }
+ // different context; pass through
+ return zuper.call();
+ }
+
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ if (num != FAILING_NODE)
+ return;
+ new ByteBuddy().rebase(SequentialWriter.class)
+
.method(named("writeDirectlyToChannel").and(takesArguments(1)))
+
.intercept(MethodDelegation.to(BBStreamTimeoutHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+
+ new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+ .method(named("append").and(takesArguments(1)))
+
.intercept(MethodDelegation.to(BBStreamTimeoutHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithEOFTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithEOFTest.java
new file mode 100644
index 0000000000..9b0f4f4dfd
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithEOFTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class StreamFailureLogsFailureInTheMiddleWithEOFTest extends
AbstractStreamFailureLogs
+{
+ @Test
+ public void failureInTheMiddleWithEOF() throws IOException
+ {
+ streamTest(false, "Session peer /127.0.0.1:7012 Failed because there
was an java.nio.channels.ClosedChannelException with state=STREAMING",
FAILING_NODE);
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithUnknownTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithUnknownTest.java
new file mode 100644
index 0000000000..9d2507bbf6
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureLogsFailureInTheMiddleWithUnknownTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class StreamFailureLogsFailureInTheMiddleWithUnknownTest extends
AbstractStreamFailureLogs
+{
+ @Test
+ public void failureInTheMiddleWithUnknown() throws IOException
+ {
+ streamTest(true, "java.lang.RuntimeException: TEST", FAILING_NODE);
+ }
+}
diff --git
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index b5bb40c3c1..64389221a1 100644
---
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -221,7 +221,7 @@ public class ZeroCopyStreamingBenchmark
StreamResultFuture future =
StreamResultFuture.createInitiator(nextTimeUUID(), StreamOperation.BOOTSTRAP,
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
- streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.INITIALIZED));
+ streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.INITIALIZED, null));
StreamSession session =
streamCoordinator.getOrCreateOutboundSession(peer);
session.init(future);
diff --git
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index a65b2ea3a0..d0a81028cc 100644
---
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -209,7 +209,7 @@ public class CassandraEntireSSTableStreamWriterTest
StreamResultFuture future =
StreamResultFuture.createInitiator(nextTimeUUID(), StreamOperation.BOOTSTRAP,
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
- streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.INITIALIZED));
+ streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.INITIALIZED, null));
StreamSession session =
streamCoordinator.getOrCreateOutboundSession(peer);
session.init(future);
diff --git
a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
index 5733b2fa5a..8107fd9789 100644
---
a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
+++
b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
@@ -324,7 +324,7 @@ public class
EntireSSTableStreamConcurrentComponentMutationTest
StreamResultFuture future =
StreamResultFuture.createInitiator(nextTimeUUID(), StreamOperation.BOOTSTRAP,
Collections.emptyList(), streamCoordinator);
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
- streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.INITIALIZED));
+ streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.INITIALIZED, null));
StreamSession session =
streamCoordinator.getOrCreateOutboundSession(peer);
session.init(future);
diff --git
a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
index c8e3d89f6b..56d01935ed 100644
--- a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
@@ -96,9 +96,9 @@ public class StreamingVirtualTableTest extends CQLTester
assertRows(execute(t("select id, follower, operation, peers, status,
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
new Object[] { state.id(), true, "Repair",
Collections.emptyList(), "start", 0F, new Date(state.lastUpdatedAtMillis()),
null, null });
- state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.PREPARING), StreamSession.PrepareDirection.ACK));
+ state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.PREPARING, null), StreamSession.PrepareDirection.ACK));
- state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR,
ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(),
Collections.emptyList(), StreamSession.State.COMPLETE))));
+ state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR,
ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(),
Collections.emptyList(), StreamSession.State.COMPLETE, null))));
assertRows(execute(t("select id, follower, operation, peers, status,
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
new Object[] { state.id(), true, "Repair",
Arrays.asList(address(127, 0, 0, 2).toString()), "success", 100F, new
Date(state.lastUpdatedAtMillis()), null, null });
}
@@ -121,8 +121,8 @@ public class StreamingVirtualTableTest extends CQLTester
StreamResultFuture future = state.future();
state.phase.start();
- SessionInfo s1 = new SessionInfo(PEER2, 0,
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()),
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
- SessionInfo s2 = new SessionInfo(PEER3, 0,
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()),
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
+ SessionInfo s1 = new SessionInfo(PEER2, 0,
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()),
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING,
null);
+ SessionInfo s2 = new SessionInfo(PEER3, 0,
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()),
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING,
null);
// we only update stats on ACK
state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), s1,
StreamSession.PrepareDirection.ACK));
diff --git
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
index 34ce09c3f6..5a8f4a58cf 100644
---
a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
+++
b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -212,7 +212,8 @@ public class EntireSSTableStreamingCorrectFilesCountTest
peer,
Collections.emptyList(),
Collections.emptyList(),
-
StreamSession.State.INITIALIZED));
+
StreamSession.State.INITIALIZED,
+ null));
StreamSession session =
streamCoordinator.getOrCreateOutboundSession(peer);
session.init(future);
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 45172fe14c..778b6b2d71 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -46,7 +46,7 @@ public class SessionInfoTest
}
StreamSummary sending = new StreamSummary(tableId, 10, 100);
- SessionInfo info = new SessionInfo(local, 0, local, summaries,
Collections.singleton(sending), StreamSession.State.PREPARING);
+ SessionInfo info = new SessionInfo(local, 0, local, summaries,
Collections.singleton(sending), StreamSession.State.PREPARING, null);
assert info.getTotalFilesToReceive() == 45;
assert info.getTotalFilesToSend() == 10;
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/NetStatsTest.java
b/test/unit/org/apache/cassandra/tools/nodetool/NetStatsTest.java
index f8bd530539..7bddc9b23a 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/NetStatsTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/NetStatsTest.java
@@ -117,7 +117,8 @@ public class NetStatsTest extends CQLTester
InetAddressAndPort.getLocalHost(),
streamSummaries,
streamSummaries,
- State.COMPLETE);
+ State.COMPLETE,
+ null);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream out = new PrintStream(baos))
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]