Fix streaming_socket_timeout_in_ms not enforced Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11286
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/561000aa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/561000aa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/561000aa Branch: refs/heads/cassandra-3.5 Commit: 561000aa3094699bab29766d9644ff50f6cb74f3 Parents: e94a2a0 Author: Paulo Motta <[email protected]> Authored: Fri Feb 12 12:17:01 2016 -0300 Committer: Yuki Morishita <[email protected]> Committed: Thu Mar 10 12:54:24 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 7 +++---- .../net/IncomingStreamingConnection.java | 7 ++++++- .../cassandra/streaming/ConnectionHandler.java | 21 +++++++++++++------- .../cassandra/streaming/StreamSession.java | 2 ++ 5 files changed, 26 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e7c997a..4b505f8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.14 + * Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286) * Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302) * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053) * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176) http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 1fa04e6..0da4800 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -630,10 +630,9 @@ request_timeout_in_ms: 10000 # and the times are synchronized between the nodes. cross_node_timeout: false -# Enable socket timeout for streaming operation. -# When a timeout occurs during streaming, streaming is retried from the start -# of the current file. This _can_ involve re-streaming an important amount of -# data, so you should avoid setting the value too low. +# Set socket timeout for streaming operation. +# The stream session is failed if no data is received by any of the +# participants within that period. # Default value is 3600000, which means streams timeout after an hour. # streaming_socket_timeout_in_ms: 3600000 http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index 1f98bc4..5ced786 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -27,6 +27,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; @@ -62,6 +63,10 @@ public class IncomingStreamingConnection extends Thread implements Closeable DataInput input = new DataInputStream(socket.getInputStream()); StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version); + //Set SO_TIMEOUT on follower side + if (!init.isForOutgoing) + socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); + // The initiator makes two connections, one for incoming and one for outgoing. // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. // Note: we cannot use the same socket for incoming and outgoing streams because we want to @@ -74,7 +79,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable close(); } } - + @Override public void close() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index ac267f9..52268b2 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -105,12 +105,22 @@ public class ConnectionHandler { logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer); - ListenableFuture<?> inClosed = incoming == null ? Futures.immediateFuture(null) : incoming.close(); - ListenableFuture<?> outClosed = outgoing == null ? Futures.immediateFuture(null) : outgoing.close(); + ListenableFuture<?> inClosed = closeIncoming(); + ListenableFuture<?> outClosed = closeOutgoing(); return Futures.allAsList(inClosed, outClosed); } + public ListenableFuture<?> closeOutgoing() + { + return outgoing == null ? Futures.immediateFuture(null) : outgoing.close(); + } + + public ListenableFuture<?> closeIncoming() + { + return incoming == null ? Futures.immediateFuture(null) : incoming.close(); + } + /** * Enqueue messages to be sent. * @@ -165,11 +175,8 @@ public class ConnectionHandler protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException { - ReadableByteChannel in = socket.getChannel(); - // socket channel is null when encrypted(SSL) - return in == null - ? Channels.newChannel(socket.getInputStream()) - : in; + //we do this instead of socket.getChannel() so socketSoTimeout is respected + return Channels.newChannel(socket.getInputStream()); } public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 98a6f1f..642e837 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -609,6 +609,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber else { state(State.WAIT_COMPLETE); + handler.closeIncoming(); } } @@ -696,6 +697,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber handler.sendMessage(new CompleteMessage()); completeSent = true; state(State.WAIT_COMPLETE); + handler.closeOutgoing(); } } return completed;
