Updated Branches: refs/heads/trunk 6158c6428 -> 67ccdabfe
Fix deadlock in new Streaming code patch by slebresne; reviewed by jbellis for CASSANDRA-5699 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67ccdabf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67ccdabf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67ccdabf Branch: refs/heads/trunk Commit: 67ccdabfe8c48229e184d1374f9c6435ccea93ec Parents: 6158c64 Author: Sylvain Lebresne <[email protected]> Authored: Sat Jun 22 12:14:32 2013 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Jul 3 19:55:41 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../net/IncomingStreamingConnection.java | 26 +- .../cassandra/streaming/ConnectionHandler.java | 272 ++++++++++++------- .../cassandra/streaming/StreamManager.java | 7 +- .../apache/cassandra/streaming/StreamPlan.java | 2 +- .../cassandra/streaming/StreamReceiveTask.java | 2 +- .../cassandra/streaming/StreamResultFuture.java | 94 ++++--- .../cassandra/streaming/StreamSession.java | 248 ++++++++++------- .../streaming/messages/FileMessage.java | 6 + .../streaming/messages/StreamInitMessage.java | 21 +- 10 files changed, 437 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8b12243..281a0aa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -60,7 +60,7 @@ * cqlsh: Add row count to SELECT output (CASSANDRA-5636) * Include a timestamp with all read commands to determine column expiration (CASSANDRA-5149) - * Streaming 2.0 (CASSANDRA-5286) + * Streaming 2.0 (CASSANDRA-5286, 5699) * Conditional create/drop ks/table/index statements in CQL3 (CASSANDRA-2737) * more pre-table creation property validation (CASSANDRA-5693) * Redesign repair messages (CASSANDRA-5426) http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index dd5b7b4..24b2bab 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -25,6 +25,8 @@ import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; @@ -58,7 +60,29 @@ public class IncomingStreamingConnection extends Thread DataInput input = new DataInputStream(socket.getInputStream()); StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version); - StreamSession.startReceivingStreamAsync(init.planId, init.description, socket, version); + // We will use the current socket to incoming stream. So if the other side is the + // stream initiator, we must first create an outgoing stream, after which real streaming + // will start. If we were the initiator however, this socket will just be our incoming + // stream, everything is setup and we can initiate real streaming by sending the prepare message. + // Note: we cannot use the same socket for incoming and outgoing streams because we want to + // parallelize said streams and the socket is blocking, so we might deadlock. + if (init.sentByInitiator) + { + StreamResultFuture.initReceivingSide(init.planId, init.description, init.from, socket, version); + } + else + { + StreamResultFuture stream = StreamManager.instance.getStream(init.planId); + if (stream == null) + { + // This should not happen. All we can do is close the socket to inform the other side, but that's a bug. + logger.error("Got StreamInit message for a stream we are supposed to be the initiator of, but stream not found."); + socket.close(); + return; + } + // We're fully setup for this session, start the actual streaming + stream.startStreaming(init.from, socket, version); + } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 27ea5af..6be1f42 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -18,6 +18,7 @@ package org.apache.cassandra.streaming; import java.io.IOException; +import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; import java.nio.channels.Channels; @@ -26,11 +27,16 @@ import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; import java.util.Collection; import java.util.Comparator; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +44,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; +import org.apache.cassandra.utils.FBUtilities; /** * ConnectionHandler manages incoming/outgoing message exchange for the {@link StreamSession}. @@ -54,44 +61,69 @@ public class ConnectionHandler private static final int MAX_CONNECT_ATTEMPTS = 3; private final StreamSession session; - private final int protocolVersion; private IncomingMessageHandler incoming; private OutgoingMessageHandler outgoing; - private boolean connected = false; - private Socket socket; - ConnectionHandler(StreamSession session) { this.session = session; - this.protocolVersion = StreamMessage.CURRENT_VERSION; } - ConnectionHandler(StreamSession session, Socket socket, int protocolVersion) + public ConnectionHandler initiate() throws IOException { - this.session = session; - this.socket = Preconditions.checkNotNull(socket); - this.connected = socket.isConnected(); - this.protocolVersion = protocolVersion; + // Connect to other side and use that as the outgoing socket. Once the receiving + // peer send back his init message, we'll have our incoming handling. + outgoing = new OutgoingMessageHandler(session, connect(session.peer), StreamMessage.CURRENT_VERSION); + + logger.debug("Sending stream init... for {}", session.planId()); + outgoing.sendInitMessage(true); + outgoing.start(); + + return this; + } + + public ConnectionHandler initiateOnReceivingSide(Socket incomingSocket, int version) throws IOException + { + // Create and start the incoming handler + incoming = new IncomingMessageHandler(session, incomingSocket, version); + incoming.start(); + + // Connect back to the other side, and use that new socket for the outgoing handler + outgoing = new OutgoingMessageHandler(session, connect(session.peer), version); + + logger.debug("Sending stream init back to initiator..."); + outgoing.sendInitMessage(false); + outgoing.start(); + return this; + } + + public void attachIncomingSocket(Socket incomingSocket, int version) throws IOException + { + incoming = new IncomingMessageHandler(session, incomingSocket, version); + incoming.start(); } /** * Connect to peer and start exchanging message. * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times. * + * @param peer the peer to connect to. + * @return the created socket. + * * @throws IOException when connection failed. */ - public void connect() throws IOException + private static Socket connect(InetAddress peer) throws IOException { int attempts = 0; while (true) { try { - socket = MessagingService.instance().getConnectionPool(session.peer).newSocket(); + logger.info("Connecting to {} for streaming", peer); + Socket socket = MessagingService.instance().getConnectionPool(peer).newSocket(); socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); - break; + return socket; } catch (IOException e) { @@ -99,7 +131,7 @@ public class ConnectionHandler throw e; long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts); - logger.warn("Failed attempt " + attempts + " to connect to " + session.peer + ". Retrying in " + waitms + " ms. (" + e + ")"); + logger.warn("Failed attempt " + attempts + " to connect to " + peer + ". Retrying in " + waitms + " ms. (" + e + ")"); try { Thread.sleep(waitms); @@ -110,64 +142,16 @@ public class ConnectionHandler } } } - // send stream init message - SocketChannel channel = socket.getChannel(); - WritableByteChannel out = channel; - // socket channel is null when encrypted(SSL) - if (channel == null) - { - out = Channels.newChannel(socket.getOutputStream()); - } - logger.debug("Sending stream init..."); - StreamInitMessage message = new StreamInitMessage(session.planId(), session.description()); - out.write(message.createMessage(false, protocolVersion)); - - connected = true; - - start(); - session.onConnect(); } - public void close() + public ListenableFuture<?> close() { - incoming.terminate(); - outgoing.terminate(); - if (socket != null && !isConnected()) - { - try - { - socket.close(); - } - catch (IOException ignore) {} - } - } + logger.debug("Closing stream connection handler on {}", session.peer); - /** - * Start incoming/outgoing messaging threads. - */ - public void start() throws IOException - { - SocketChannel channel = socket.getChannel(); - ReadableByteChannel in = channel; - WritableByteChannel out = channel; - // socket channel is null when encrypted(SSL) - if (channel == null) - { - in = Channels.newChannel(socket.getInputStream()); - out = Channels.newChannel(socket.getOutputStream()); - } - - incoming = new IncomingMessageHandler(session, protocolVersion, in); - outgoing = new OutgoingMessageHandler(session, protocolVersion, out); + ListenableFuture<?> inClosed = incoming == null ? Futures.immediateFuture(null) : incoming.close(); + ListenableFuture<?> outClosed = outgoing == null ? Futures.immediateFuture(null) : outgoing.close(); - // ready to send/receive files - new Thread(incoming, "STREAM-IN-" + session.peer).start(); - new Thread(outgoing, "STREAM-OUT-" + session.peer).start(); - } - - public boolean isConnected() - { - return connected; + return Futures.allAsList(inClosed, outClosed); } /** @@ -183,30 +167,83 @@ public class ConnectionHandler public void sendMessage(StreamMessage message) { - assert isConnected(); + if (outgoing.isClosed()) + throw new RuntimeException("Outgoing stream handler has been closed"); + outgoing.enqueue(message); } abstract static class MessageHandler implements Runnable { protected final StreamSession session; + + protected final Socket socket; protected final int protocolVersion; - private volatile boolean terminated; - protected MessageHandler(StreamSession session, int protocolVersion) + private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>(); + + protected MessageHandler(StreamSession session, Socket socket, int protocolVersion) { this.session = session; + this.socket = socket; this.protocolVersion = protocolVersion; } - public void terminate() + protected abstract String name(); + + protected WritableByteChannel getWriteChannel() throws IOException { - terminated = true; + WritableByteChannel out = socket.getChannel(); + // socket channel is null when encrypted(SSL) + return out == null + ? Channels.newChannel(socket.getOutputStream()) + : out; } - public boolean terminated() + protected ReadableByteChannel getReadChannel() throws IOException { - return terminated; + ReadableByteChannel in = socket.getChannel(); + // socket channel is null when encrypted(SSL) + return in == null + ? Channels.newChannel(socket.getInputStream()) + : in; + } + + public void sendInitMessage(boolean sentByInitiator) throws IOException + { + StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), session.planId(), session.description(), sentByInitiator); + getWriteChannel().write(message.createMessage(false, protocolVersion)); + } + + public void start() + { + new Thread(this, name() + "-" + session.peer).start(); + } + + public ListenableFuture<?> close() + { + // Assume it wasn't closed. Not a huge deal if we create a future on a race + SettableFuture<?> future = SettableFuture.create(); + return closeFuture.compareAndSet(null, future) + ? future + : closeFuture.get(); + } + + public boolean isClosed() + { + return closeFuture.get() != null; + } + + protected void signalCloseDone() + { + closeFuture.get().set(null); + + // We can now close the socket + try + { + socket.close(); + } + catch (IOException ignore) {} } } @@ -217,33 +254,44 @@ public class ConnectionHandler { private final ReadableByteChannel in; - IncomingMessageHandler(StreamSession session, int protocolVersion, ReadableByteChannel in) + IncomingMessageHandler(StreamSession session, Socket socket, int protocolVersion) throws IOException + { + super(session, socket, protocolVersion); + this.in = getReadChannel(); + } + + protected String name() { - super(session, protocolVersion); - this.in = in; + return "STREAM-IN"; } public void run() { - while (!terminated()) + while (!isClosed()) { try { // receive message StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session); - assert message != null; - session.messageReceived(message); + // Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok + // to ignore here since we'll have asked for a retry. + if (message != null) + { + logger.debug("Received {}", message); + session.messageReceived(message); + } } catch (SocketException e) { // socket is closed - terminate(); + close(); } catch (Throwable e) { session.onError(e); } } + signalCloseDone(); } } @@ -268,10 +316,15 @@ public class ConnectionHandler private final WritableByteChannel out; - OutgoingMessageHandler(StreamSession session, int protocolVersion, WritableByteChannel out) + OutgoingMessageHandler(StreamSession session, Socket socket, int protocolVersion) throws IOException + { + super(session, socket, protocolVersion); + this.out = getWriteChannel(); + } + + protected String name() { - super(session, protocolVersion); - this.out = out; + return "STREAM-OUT"; } public void enqueue(StreamMessage message) @@ -281,29 +334,52 @@ public class ConnectionHandler public void run() { - while (!terminated()) + StreamMessage next; + while (!isClosed()) { try { - StreamMessage next = messageQueue.poll(1, TimeUnit.SECONDS); - if (next != null) + if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != null) { - logger.debug("Sending " + next); - StreamMessage.serialize(next, out, protocolVersion, session); + logger.debug("Sending {}", next); + sendMessage(next); if (next.type == StreamMessage.Type.SESSION_FAILED) - terminate(); + close(); } } - catch (SocketException e) - { - session.onError(e); - terminate(); - } - catch (InterruptedException | IOException e) + catch (InterruptedException e) { - session.onError(e); + throw new AssertionError(e); } } + + try + { + // Sends the last messages on the queue + while ((next = messageQueue.poll()) != null) + sendMessage(next); + } + finally + { + signalCloseDone(); + } + } + + private void sendMessage(StreamMessage message) + { + try + { + StreamMessage.serialize(message, out, protocolVersion, session); + } + catch (SocketException e) + { + session.onError(e); + close(); + } + catch (IOException e) + { + session.onError(e); + } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java index 8ea9976..50f8978 100644 --- a/src/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/java/org/apache/cassandra/streaming/StreamManager.java @@ -55,7 +55,7 @@ public class StreamManager implements StreamManagerMBean */ public static RateLimiter getRateLimiter() { - double currentThroughput = ((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * 1024 * 1024 / 8 / 1000; + double currentThroughput = ((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * 1024 * 1024; // if throughput is set to 0, throttling is disabled if (currentThroughput == 0) currentThroughput = Double.MAX_VALUE; @@ -91,4 +91,9 @@ public class StreamManager implements StreamManagerMBean currentStreams.put(result.planId, result); } + + public StreamResultFuture getStream(UUID planId) + { + return currentStreams.get(planId); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 78d50ad..08ab014 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -139,7 +139,7 @@ public class StreamPlan */ public StreamResultFuture execute() { - return StreamResultFuture.startStreamingAsync(planId, description, sessions.values()); + return StreamResultFuture.init(planId, description, sessions.values()); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 4fe180c..ac21352 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -53,7 +53,7 @@ public class StreamReceiveTask extends StreamTask * * @param sstable SSTable file received. */ - public void receive(SSTableReader sstable) + public void received(SSTableReader sstable) { assert cfId.equals(sstable.metadata.cfId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 84332bd..cc2432a 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -17,7 +17,9 @@ */ package org.apache.cassandra.streaming; +import java.io.IOException; import java.net.InetAddress; +import java.net.Socket; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.LinkedBlockingQueue; @@ -29,29 +31,34 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.utils.FBUtilities; + /** - * StreamResultFuture asynchronously returns the final {@link StreamState} of execution of {@link StreamPlan}. + * A future on the result ({@link StreamState}) of a streaming plan. + * + * In practice, this object also groups all the {@link StreamSession} for the streaming job + * involved. One StreamSession will be created for every peer involved and said session will + * handle every streaming (outgoing and incoming) to that peer for this job. * <p> - * You can attach {@link StreamEventHandler} to this object to listen on {@link StreamEvent}s to track progress of the streaming. + * The future will return a result once every session is completed (successfully or not). If + * any session ended up with an error, the future will throw a StreamException. + * <p> + * You can attach {@link StreamEventHandler} to this object to listen on {@link StreamEvent}s to + * track progress of the streaming. */ public final class StreamResultFuture extends AbstractFuture<StreamState> { - // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming - // is directly handled by the ConnectionHandler incoming and outgoing threads. - private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", - FBUtilities.getAvailableProcessors()); + private static final Logger logger = LoggerFactory.getLogger(StreamResultFuture.class); public final UUID planId; public final String description; private final List<StreamEventHandler> eventListeners = Collections.synchronizedList(new ArrayList<StreamEventHandler>()); - private final Set<UUID> ongoingSessions; + + private final Map<InetAddress, StreamSession> ongoingSessions; private final Map<InetAddress, SessionInfo> sessionStates = new NonBlockingHashMap<>(); /** @@ -63,39 +70,63 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> * @param description Stream description * @param numberOfSessions number of sessions to wait for complete */ - private StreamResultFuture(UUID planId, String description, Set<UUID> sessions) + private StreamResultFuture(UUID planId, String description, Collection<StreamSession> sessions) { this.planId = planId; this.description = description; - this.ongoingSessions = sessions; + this.ongoingSessions = new HashMap<>(sessions.size()); + for (StreamSession session : sessions) + this.ongoingSessions.put(session.peer, session);; // if there is no session to listen to, we immediately set result for returning if (sessions.isEmpty()) set(getCurrentState()); } - static StreamResultFuture startStreamingAsync(UUID planId, String description, Collection<StreamSession> sessions) + static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions) { - Set<UUID> sessionsIds = new HashSet<>(sessions.size()); - for (StreamSession session : sessions) - sessionsIds.add(session.id); - - StreamResultFuture future = new StreamResultFuture(planId, description, sessionsIds); - - StreamManager.instance.register(future); + StreamResultFuture future = createAndRegister(planId, description, sessions); // start sessions - for (StreamSession session : sessions) + for (final StreamSession session : sessions) { - session.register(future); - // register to gossiper/FD to fail on node failure - Gossiper.instance.register(session); - FailureDetector.instance.registerFailureDetectionEventListener(session); - streamExecutor.submit(session); + session.init(future); + session.start(); } + return future; } + public static StreamResultFuture initReceivingSide(UUID planId, String description, InetAddress from, Socket socket, int version) + { + final StreamSession session = new StreamSession(from); + + // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. + StreamResultFuture future = createAndRegister(planId, description, Collections.singleton(session)); + + session.init(future); + session.start(socket, version); + + return future; + } + + private static StreamResultFuture createAndRegister(UUID planId, String description, Collection<StreamSession> sessions) + { + StreamResultFuture future = new StreamResultFuture(planId, description, sessions); + StreamManager.instance.register(future); + return future; + } + + public void startStreaming(InetAddress from, Socket socket, int version) throws IOException + { + StreamSession session = ongoingSessions.get(from); + if (session == null) + throw new RuntimeException(String.format("Got connection from %s for stream session %s but no such session locally", from, planId)); + + session.handler.attachIncomingSocket(socket, version); + session.onInitializationComplete(); + } + public void addEventListener(StreamEventHandler listener) { Futures.addCallback(this, listener); @@ -135,13 +166,12 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> void handleSessionComplete(StreamSession session) { - Gossiper.instance.unregister(session); - FailureDetector.instance.unregisterFailureDetectionEventListener(session); + logger.debug("Session with {} is complete", session.peer); SessionInfo sessionInfo = session.getSessionInfo(); sessionStates.put(sessionInfo.peer, sessionInfo); fireStreamEvent(new StreamEvent.SessionCompleteEvent(session)); - maybeComplete(session.id); + maybeComplete(session); } public void handleProgress(ProgressInfo progress) @@ -157,9 +187,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> listener.handleStreamEvent(event); } - private synchronized void maybeComplete(UUID sessionId) + private synchronized void maybeComplete(StreamSession session) { - ongoingSessions.remove(sessionId); + ongoingSessions.remove(session.peer); if (ongoingSessions.isEmpty()) { StreamState finalState = getCurrentState(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 9c3dfaa..6a771a1 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowPosition; @@ -45,40 +46,79 @@ import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; /** - * StreamSession is the center of Cassandra Streaming API. + * Handles the streaming a one or more section of one of more sstables to and from a specific + * remote node. * - * StreamSession on the both endpoints exchange messages and files until complete. + * Both this node and the remote one will create a similar symmetrical StreamSession. A streaming + * session has the following life-cycle: * - * It is created through {@link StreamPlan} on the initiator node, - * and also is created directly from connected socket on the other end when received init message. + * 1. Connections Initialization * - * <p> - * StreamSession goes through several stages: - * <ol> - * <li> - * Init - * <p>StreamSession in one end send init message to the other end.</p> - * </li> - * <li> - * Prepare - * <p>StreamSession in both endpoints are created, so in this phase, they exchange - * request and summary messages to prepare receiving/streaming files in next phase.</p> - * </li> - * <li> - * Stream - * <p>StreamSessions in both ends stream and receive files.</p> - * </li> - * <li> - * Complete - * <p>Session completes if both endpoints completed by exchanging complete message.</p> - * </li> - * </ol> + * (a) A node (the initiator in the following) create a new StreamSession, initialize it (init()) + * and then start it (start()). Start will create a {@link ConnectionHandler} that will create + * a connection to the remote node (the follower in the following) with whom to stream and send + * a StreamInit message. This first connection will be the outgoing connection for the + * initiator. + * (b) Upon reception of that StreamInit message, the follower creates its own StreamSession, + * initialize it and start it using start(Socket, int). This creates the follower + * ConnectionHandler, which will use the just opened connection as incoming connection. It + * will then connect back to the initiator and send its own StreamInit message on that new + * connection. This new connection will be the outgoing connection for the follower. + * (c) On receiving the follower StreamInit message, the initiator will record that new connection + * as it's own incoming connection and call the Session onInitializationComplete() method to start + * the streaming prepare phase (StreamResultFuture.startStreaming()). + * + * 2. Streaming preparation phase + * + * (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a + * PrepareMessage that includes what files/sections this node will stream to the follower + * (stored in a StreamTranferTask, each column family has it's own transfer task) and what + * the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has + * nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise, + * it waits for the follower PrepareMessage. + * (b) Upon reception of the PrepareMessage, the follower records which files/sections it will receive + * and send back its own PrepareMessage with a summary of the files/sections that will be sent to + * the initiator (prepare()). After having sent that message, the follower goes to its Streamning + * phase. + * (c) When the initiator receives the follower PrepareMessage, it records which files/sections it will + * receive and then goes to his own Streaming phase. + * + * 3. Streaming phase + * + * (a) The streaming phase is started by each node (the sender in the follower, but note that each side + * of the StreamSession may be sender for some of the files) involved by calling startStreamingFiles(). + * This will sequentially send a FileMessage for each file of each SteamTransferTask. Each FileMessage + * consists of a FileMessageHeader that indicates which file is coming and then start streaming the + * content for that file (StreamWriter in FileMessage.serialize()). When a file is fully sent, the + * fileSent() method is called for that file. If all the files for a StreamTransferTask are sent + * (StreamTransferTask.complete()), the task is marked complete (taskCompleted()). + * (b) On the receiving side, a SSTable will be written for the incoming file (StreamReader in + * FileMessage.deserialize()) and once the FileMessage is fully received, the file will be marked as + * complete (received()). When all files for the StreamReceiveTask have been received, the sstables + * are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task + * is marked complete (taskCompleted()) + * (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream + * (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries()) + * by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new + * FileMessage for that file. + * (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase + * (maybeCompleted()). + * + * 4. Completion phase + * + * (a) When a node has finished all transfer and receive task, it enter the completion phase (maybeCompleted()). + * If it had already received a CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that + * session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and + * send a CompleteMessage to the other side. */ -public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, IFailureDetectionEventListener +public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener { private static final Logger logger = LoggerFactory.getLogger(StreamSession.class); - public final UUID id = UUIDGen.getTimeUUID(); + // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming + // is directly handled by the ConnectionHandler incoming and outgoing threads. + private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", + FBUtilities.getAvailableProcessors()); public final InetAddress peer; // should not be null when session is started @@ -98,7 +138,7 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, public static enum State { - INITIALIZING, + INITIALIZED, PREPARING, STREAMING, WAIT_COMPLETE, @@ -106,7 +146,7 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, FAILED, } - private volatile State state = State.INITIALIZING; + private volatile State state = State.INITIALIZED; /** * Create new streaming session with the peer. @@ -120,19 +160,6 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, this.metrics = StreamingMetrics.get(peer); } - /** - * Create streaming session from established connection. - * - * @param socket established connection - * @param protocolVersion Streaming protocol verison - */ - public StreamSession(Socket socket, int protocolVersion) - { - this.peer = socket.getInetAddress(); - this.handler = new ConnectionHandler(this, socket, protocolVersion); - this.metrics = StreamingMetrics.get(peer); - } - public UUID planId() { return streamResult == null ? null : streamResult.planId; @@ -143,25 +170,67 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, return streamResult == null ? null : streamResult.description; } - public static StreamSession startReceivingStreamAsync(UUID planId, String description, Socket socket, int version) - { - StreamSession session = new StreamSession(socket, version); - StreamResultFuture.startStreamingAsync(planId, description, Collections.singleton(session)); - return session; - } - /** - * Bind this session to report to specific {@link StreamResultFuture}. + * Bind this session to report to specific {@link StreamResultFuture} and + * perform pre-streaming initialization. * * @param streamResult result to report to * @return this object for chaining */ - public StreamSession register(StreamResultFuture streamResult) + public void init(StreamResultFuture streamResult) { this.streamResult = streamResult; - return this; + + // register to gossiper/FD to fail on node failure + Gossiper.instance.register(this); + FailureDetector.instance.registerFailureDetectionEventListener(this); + } + public void start() + { + if (requests.isEmpty() && transfers.isEmpty()) + { + logger.debug("Session does not have any tasks."); + closeSession(State.COMPLETE); + return; + } + + streamExecutor.execute(new Runnable() + { + public void run() + { + try + { + handler.initiate(); + } + catch (IOException e) + { + onError(e); + } + } + }); + } + + public void start(final Socket socket, final int version) + { + streamExecutor.execute(new Runnable() + { + public void run() + { + try + { + handler.initiateOnReceivingSide(socket, version); + } + catch (IOException e) + { + onError(e); + } + } + }); + } + + /** * Request data fetch task to this session. * @@ -240,38 +309,17 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, } } - /** - * Start this stream session. - */ - public void run() + private void closeSession(State finalState) { - assert streamResult != null : "No result is associated with this session"; + state(finalState); - try - { - if (handler.isConnected()) - { - // if this session is created from remote... - handler.start(); - } - else - { - if (requests.isEmpty() && transfers.isEmpty()) - { - logger.debug("Session does not have any tasks."); - state(State.COMPLETE); - streamResult.handleSessionComplete(this); - } - else - { - handler.connect(); - } - } - } - catch (IOException e) - { - onError(e); - } + // Note that we shouldn't block on this close because this method is called on the handler + // incoming thread (so we would deadlock). + handler.close(); + + Gossiper.instance.unregister(this); + FailureDetector.instance.unregisterFailureDetectionEventListener(this); + streamResult.handleSessionComplete(this); } /** @@ -312,7 +360,7 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, break; case FILE: - receive((FileMessage) message); + received((FileMessage) message); break; case RETRY: @@ -331,11 +379,9 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, } /** - * Call back for connection success. - * - * When connected, session moves to preparing phase and sends prepare message. + * Call back when connection initialization is complete to start the prepare phase. */ - public void onConnect() + public void onInitializationComplete() { logger.debug("Connected. Sending prepare..."); @@ -362,13 +408,11 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, */ public void onError(Throwable e) { - state(State.FAILED); - logger.error("Streaming error occurred", e); // send session failure message handler.sendMessage(new SessionFailedMessage()); // fail session - streamResult.handleSessionComplete(this); + closeSession(State.FAILED); } /** @@ -376,7 +420,8 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, */ public void prepare(Collection<StreamRequest> requests, Collection<StreamSummary> summaries) { - logger.debug("Start preparing this session (" + requests.size() + " requests, " + summaries.size() + " columnfamilies receiving)"); + logger.debug("Start preparing this session (" + requests.size() + " to send, " + summaries.size() + " to receive)"); + // prepare tasks state(State.PREPARING); for (StreamRequest request : requests) @@ -418,11 +463,11 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, * * @param message received file */ - public void receive(FileMessage message) + public void received(FileMessage message) { StreamingMetrics.totalIncomingBytes.inc(message.header.size()); metrics.incomingBytes.inc(message.header.size()); - receivers.get(message.header.cfId).receive(message.sstable); + receivers.get(message.header.cfId).received(message.sstable); } public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total) @@ -450,9 +495,7 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, { if (state == State.WAIT_COMPLETE) { - state(State.COMPLETE); - handler.close(); - streamResult.handleSessionComplete(this); + closeSession(State.COMPLETE); } else { @@ -465,8 +508,7 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, */ public synchronized void sessionFailed() { - handler.close(); - streamResult.handleSessionComplete(this); + closeSession(State.FAILED); } public void doRetry(FileMessageHeader header, Throwable e) @@ -529,8 +571,7 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) return; - state(State.FAILED); - streamResult.handleSessionComplete(this); + closeSession(State.FAILED); } private boolean maybeCompleted() @@ -540,9 +581,7 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, { if (state == State.WAIT_COMPLETE) { - state(State.COMPLETE); - handler.close(); - streamResult.handleSessionComplete(this); + closeSession(State.COMPLETE); } else { @@ -554,7 +593,6 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, return completed; } - /** * Flushes matching column families from the given keyspace, or all columnFamilies * if the cf list is empty. @@ -570,7 +608,7 @@ public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, private void prepareReceiving(StreamSummary summary) { - logger.debug("prepare receiving " + summary); + logger.debug("Prepare for receiving " + summary); if (summary.files > 0) receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/messages/FileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java index fe05eac..464c35a 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java @@ -103,4 +103,10 @@ public class FileMessage extends StreamMessage sections, compressionInfo); } + + @Override + public String toString() + { + return "FileMessage(" + sstable + ")"; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index d1e797c..685ad41 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -20,12 +20,14 @@ package org.apache.cassandra.streaming.messages; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.UUID; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; @@ -37,13 +39,19 @@ public class StreamInitMessage { public static IVersionedSerializer<StreamInitMessage> serializer = new StreamInitMessageSerializer(); + public final InetAddress from; public final UUID planId; public final String description; - public StreamInitMessage(UUID planId, String description) + // Whether the sender of this message is the stream initiator + public final boolean sentByInitiator; + + public StreamInitMessage(InetAddress from, UUID planId, String description, boolean sentByInitiator) { + this.from = from; this.planId = planId; this.description = description; + this.sentByInitiator = sentByInitiator; } /** @@ -95,20 +103,27 @@ public class StreamInitMessage { public void serialize(StreamInitMessage message, DataOutput out, int version) throws IOException { + CompactEndpointSerializationHelper.serialize(message.from, out); UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version); out.writeUTF(message.description); + out.writeBoolean(message.sentByInitiator); } public StreamInitMessage deserialize(DataInput in, int version) throws IOException { + InetAddress from = CompactEndpointSerializationHelper.deserialize(in); UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); - return new StreamInitMessage(planId, in.readUTF()); + String description = in.readUTF(); + boolean sentByInitiator = in.readBoolean(); + return new StreamInitMessage(from, planId, description, sentByInitiator); } public long serializedSize(StreamInitMessage message, int version) { - long size = UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version); + long size = CompactEndpointSerializationHelper.serializedSize(message.from); + size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version); size += TypeSizes.NATIVE.sizeof(message.description); + size += TypeSizes.NATIVE.sizeof(message.sentByInitiator); return size; } }
