Updated Branches:
  refs/heads/trunk 6158c6428 -> 67ccdabfe

Fix deadlock in new Streaming code

patch by slebresne; reviewed by jbellis for CASSANDRA-5699


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67ccdabf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67ccdabf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67ccdabf

Branch: refs/heads/trunk
Commit: 67ccdabfe8c48229e184d1374f9c6435ccea93ec
Parents: 6158c64
Author: Sylvain Lebresne <[email protected]>
Authored: Sat Jun 22 12:14:32 2013 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Wed Jul 3 19:55:41 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../net/IncomingStreamingConnection.java        |  26 +-
 .../cassandra/streaming/ConnectionHandler.java  | 272 ++++++++++++-------
 .../cassandra/streaming/StreamManager.java      |   7 +-
 .../apache/cassandra/streaming/StreamPlan.java  |   2 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   2 +-
 .../cassandra/streaming/StreamResultFuture.java |  94 ++++---
 .../cassandra/streaming/StreamSession.java      | 248 ++++++++++-------
 .../streaming/messages/FileMessage.java         |   6 +
 .../streaming/messages/StreamInitMessage.java   |  21 +-
 10 files changed, 437 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8b12243..281a0aa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,7 +60,7 @@
  * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
  * Include a timestamp with all read commands to determine column expiration
    (CASSANDRA-5149)
- * Streaming 2.0 (CASSANDRA-5286)
+ * Streaming 2.0 (CASSANDRA-5286, 5699)
  * Conditional create/drop ks/table/index statements in CQL3 (CASSANDRA-2737)
  * more pre-table creation property validation (CASSANDRA-5693)
  * Redesign repair messages (CASSANDRA-5426)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java 
b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index dd5b7b4..24b2bab 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -25,6 +25,8 @@ import java.net.Socket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
@@ -58,7 +60,29 @@ public class IncomingStreamingConnection extends Thread
             DataInput input = new DataInputStream(socket.getInputStream());
             StreamInitMessage init = 
StreamInitMessage.serializer.deserialize(input, version);
 
-            StreamSession.startReceivingStreamAsync(init.planId, 
init.description, socket, version);
+            // We will use the current socket to incoming stream. So if the 
other side is the
+            // stream initiator, we must first create an outgoing stream, 
after which real streaming
+            // will start. If we were the initiator however, this socket will 
just be our incoming
+            // stream, everything is setup and we can initiate real streaming 
by sending the prepare message.
+            // Note: we cannot use the same socket for incoming and outgoing 
streams because we want to
+            // parallelize said streams and the socket is blocking, so we 
might deadlock.
+            if (init.sentByInitiator)
+            {
+                StreamResultFuture.initReceivingSide(init.planId, 
init.description, init.from, socket, version);
+            }
+            else
+            {
+                StreamResultFuture stream = 
StreamManager.instance.getStream(init.planId);
+                if (stream == null)
+                {
+                    // This should not happen. All we can do is close the 
socket to inform the other side, but that's a bug.
+                    logger.error("Got StreamInit message for a stream we are 
supposed to be the initiator of, but stream not found.");
+                    socket.close();
+                    return;
+                }
+                // We're fully setup for this session, start the actual 
streaming
+                stream.startStreaming(init.from, socket, version);
+            }
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java 
b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 27ea5af..6be1f42 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.streaming;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.nio.channels.Channels;
@@ -26,11 +27,16 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +44,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * ConnectionHandler manages incoming/outgoing message exchange for the {@link 
StreamSession}.
@@ -54,44 +61,69 @@ public class ConnectionHandler
     private static final int MAX_CONNECT_ATTEMPTS = 3;
 
     private final StreamSession session;
-    private final int protocolVersion;
 
     private IncomingMessageHandler incoming;
     private OutgoingMessageHandler outgoing;
 
-    private boolean connected = false;
-    private Socket socket;
-
     ConnectionHandler(StreamSession session)
     {
         this.session = session;
-        this.protocolVersion = StreamMessage.CURRENT_VERSION;
     }
 
-    ConnectionHandler(StreamSession session, Socket socket, int 
protocolVersion)
+    public ConnectionHandler initiate() throws IOException
     {
-        this.session = session;
-        this.socket = Preconditions.checkNotNull(socket);
-        this.connected = socket.isConnected();
-        this.protocolVersion = protocolVersion;
+        // Connect to other side and use that as the outgoing socket. Once the 
receiving
+        // peer send back his init message, we'll have our incoming handling.
+        outgoing = new OutgoingMessageHandler(session, connect(session.peer), 
StreamMessage.CURRENT_VERSION);
+
+        logger.debug("Sending stream init... for {}", session.planId());
+        outgoing.sendInitMessage(true);
+        outgoing.start();
+
+        return this;
+    }
+
+    public ConnectionHandler initiateOnReceivingSide(Socket incomingSocket, 
int version) throws IOException
+    {
+        // Create and start the incoming handler
+        incoming = new IncomingMessageHandler(session, incomingSocket, 
version);
+        incoming.start();
+
+        // Connect back to the other side, and use that new socket for the 
outgoing handler
+        outgoing = new OutgoingMessageHandler(session, connect(session.peer), 
version);
+
+        logger.debug("Sending stream init back to initiator...");
+        outgoing.sendInitMessage(false);
+        outgoing.start();
+        return this;
+    }
+
+    public void attachIncomingSocket(Socket incomingSocket, int version) 
throws IOException
+    {
+        incoming = new IncomingMessageHandler(session, incomingSocket, 
version);
+        incoming.start();
     }
 
     /**
      * Connect to peer and start exchanging message.
      * When connect attempt fails, this retries for maximum of 
MAX_CONNECT_ATTEMPTS times.
      *
+     * @param peer the peer to connect to.
+     * @return the created socket.
+     *
      * @throws IOException when connection failed.
      */
-    public void connect() throws IOException
+    private static Socket connect(InetAddress peer) throws IOException
     {
         int attempts = 0;
         while (true)
         {
             try
             {
-                socket = 
MessagingService.instance().getConnectionPool(session.peer).newSocket();
+                logger.info("Connecting to {} for streaming", peer);
+                Socket socket = 
MessagingService.instance().getConnectionPool(peer).newSocket();
                 
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
-                break;
+                return socket;
             }
             catch (IOException e)
             {
@@ -99,7 +131,7 @@ public class ConnectionHandler
                     throw e;
 
                 long waitms = DatabaseDescriptor.getRpcTimeout() * 
(long)Math.pow(2, attempts);
-                logger.warn("Failed attempt " + attempts + " to connect to " + 
session.peer + ". Retrying in " + waitms + " ms. (" + e + ")");
+                logger.warn("Failed attempt " + attempts + " to connect to " + 
peer + ". Retrying in " + waitms + " ms. (" + e + ")");
                 try
                 {
                     Thread.sleep(waitms);
@@ -110,64 +142,16 @@ public class ConnectionHandler
                 }
             }
         }
-        // send stream init message
-        SocketChannel channel = socket.getChannel();
-        WritableByteChannel out = channel;
-        // socket channel is null when encrypted(SSL)
-        if (channel == null)
-        {
-            out = Channels.newChannel(socket.getOutputStream());
-        }
-        logger.debug("Sending stream init...");
-        StreamInitMessage message = new StreamInitMessage(session.planId(), 
session.description());
-        out.write(message.createMessage(false, protocolVersion));
-
-        connected = true;
-
-        start();
-        session.onConnect();
     }
 
-    public void close()
+    public ListenableFuture<?> close()
     {
-        incoming.terminate();
-        outgoing.terminate();
-        if (socket != null && !isConnected())
-        {
-            try
-            {
-                socket.close();
-            }
-            catch (IOException ignore) {}
-        }
-    }
+        logger.debug("Closing stream connection handler on {}", session.peer);
 
-    /**
-     * Start incoming/outgoing messaging threads.
-     */
-    public void start() throws IOException
-    {
-        SocketChannel channel = socket.getChannel();
-        ReadableByteChannel in = channel;
-        WritableByteChannel out = channel;
-        // socket channel is null when encrypted(SSL)
-        if (channel == null)
-        {
-            in = Channels.newChannel(socket.getInputStream());
-            out = Channels.newChannel(socket.getOutputStream());
-        }
-
-        incoming = new IncomingMessageHandler(session, protocolVersion, in);
-        outgoing = new OutgoingMessageHandler(session, protocolVersion, out);
+        ListenableFuture<?> inClosed = incoming == null ? 
Futures.immediateFuture(null) : incoming.close();
+        ListenableFuture<?> outClosed = outgoing == null ? 
Futures.immediateFuture(null) : outgoing.close();
 
-        // ready to send/receive files
-        new Thread(incoming, "STREAM-IN-" + session.peer).start();
-        new Thread(outgoing, "STREAM-OUT-" + session.peer).start();
-    }
-
-    public boolean isConnected()
-    {
-        return connected;
+        return Futures.allAsList(inClosed, outClosed);
     }
 
     /**
@@ -183,30 +167,83 @@ public class ConnectionHandler
 
     public void sendMessage(StreamMessage message)
     {
-        assert isConnected();
+        if (outgoing.isClosed())
+            throw new RuntimeException("Outgoing stream handler has been 
closed");
+
         outgoing.enqueue(message);
     }
 
     abstract static class MessageHandler implements Runnable
     {
         protected final StreamSession session;
+
+        protected final Socket socket;
         protected final int protocolVersion;
-        private volatile boolean terminated;
 
-        protected MessageHandler(StreamSession session, int protocolVersion)
+        private final AtomicReference<SettableFuture<?>> closeFuture = new 
AtomicReference<>();
+
+        protected MessageHandler(StreamSession session, Socket socket, int 
protocolVersion)
         {
             this.session = session;
+            this.socket = socket;
             this.protocolVersion = protocolVersion;
         }
 
-        public void terminate()
+        protected abstract String name();
+
+        protected WritableByteChannel getWriteChannel() throws IOException
         {
-            terminated = true;
+            WritableByteChannel out = socket.getChannel();
+            // socket channel is null when encrypted(SSL)
+            return out == null
+                 ? Channels.newChannel(socket.getOutputStream())
+                 : out;
         }
 
-        public boolean terminated()
+        protected ReadableByteChannel getReadChannel() throws IOException
         {
-            return terminated;
+            ReadableByteChannel in = socket.getChannel();
+            // socket channel is null when encrypted(SSL)
+            return in == null
+                 ? Channels.newChannel(socket.getInputStream())
+                 : in;
+        }
+
+        public void sendInitMessage(boolean sentByInitiator) throws IOException
+        {
+            StreamInitMessage message = new 
StreamInitMessage(FBUtilities.getBroadcastAddress(), session.planId(), 
session.description(), sentByInitiator);
+            getWriteChannel().write(message.createMessage(false, 
protocolVersion));
+        }
+
+        public void start()
+        {
+            new Thread(this, name() + "-" + session.peer).start();
+        }
+
+        public ListenableFuture<?> close()
+        {
+            // Assume it wasn't closed. Not a huge deal if we create a future 
on a race
+            SettableFuture<?> future = SettableFuture.create();
+            return closeFuture.compareAndSet(null, future)
+                 ? future
+                 : closeFuture.get();
+        }
+
+        public boolean isClosed()
+        {
+            return closeFuture.get() != null;
+        }
+
+        protected void signalCloseDone()
+        {
+            closeFuture.get().set(null);
+
+            // We can now close the socket
+            try
+            {
+                socket.close();
+            }
+            catch (IOException ignore) {}
         }
     }
 
@@ -217,33 +254,44 @@ public class ConnectionHandler
     {
         private final ReadableByteChannel in;
 
-        IncomingMessageHandler(StreamSession session, int protocolVersion, 
ReadableByteChannel in)
+        IncomingMessageHandler(StreamSession session, Socket socket, int 
protocolVersion) throws IOException
+        {
+            super(session, socket, protocolVersion);
+            this.in = getReadChannel();
+        }
+
+        protected String name()
         {
-            super(session, protocolVersion);
-            this.in = in;
+            return "STREAM-IN";
         }
 
         public void run()
         {
-            while (!terminated())
+            while (!isClosed())
             {
                 try
                 {
                     // receive message
                     StreamMessage message = StreamMessage.deserialize(in, 
protocolVersion, session);
-                    assert message != null;
-                    session.messageReceived(message);
+                    // Might be null if there is an error during streaming 
(see FileMessage.deserialize). It's ok
+                    // to ignore here since we'll have asked for a retry.
+                    if (message != null)
+                    {
+                        logger.debug("Received {}", message);
+                        session.messageReceived(message);
+                    }
                 }
                 catch (SocketException e)
                 {
                     // socket is closed
-                    terminate();
+                    close();
                 }
                 catch (Throwable e)
                 {
                     session.onError(e);
                 }
             }
+            signalCloseDone();
         }
     }
 
@@ -268,10 +316,15 @@ public class ConnectionHandler
 
         private final WritableByteChannel out;
 
-        OutgoingMessageHandler(StreamSession session, int protocolVersion, 
WritableByteChannel out)
+        OutgoingMessageHandler(StreamSession session, Socket socket, int 
protocolVersion) throws IOException
+        {
+            super(session, socket, protocolVersion);
+            this.out = getWriteChannel();
+        }
+
+        protected String name()
         {
-            super(session, protocolVersion);
-            this.out = out;
+            return "STREAM-OUT";
         }
 
         public void enqueue(StreamMessage message)
@@ -281,29 +334,52 @@ public class ConnectionHandler
 
         public void run()
         {
-            while (!terminated())
+            StreamMessage next;
+            while (!isClosed())
             {
                 try
                 {
-                    StreamMessage next = messageQueue.poll(1, 
TimeUnit.SECONDS);
-                    if (next != null)
+                    if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != 
null)
                     {
-                        logger.debug("Sending " + next);
-                        StreamMessage.serialize(next, out, protocolVersion, 
session);
+                        logger.debug("Sending {}", next);
+                        sendMessage(next);
                         if (next.type == StreamMessage.Type.SESSION_FAILED)
-                            terminate();
+                            close();
                     }
                 }
-                catch (SocketException e)
-                {
-                    session.onError(e);
-                    terminate();
-                }
-                catch (InterruptedException | IOException e)
+                catch (InterruptedException e)
                 {
-                    session.onError(e);
+                    throw new AssertionError(e);
                 }
             }
+
+            try
+            {
+                // Sends the last messages on the queue
+                while ((next = messageQueue.poll()) != null)
+                    sendMessage(next);
+            }
+            finally
+            {
+                signalCloseDone();
+            }
+        }
+
+        private void sendMessage(StreamMessage message)
+        {
+            try
+            {
+                StreamMessage.serialize(message, out, protocolVersion, 
session);
+            }
+            catch (SocketException e)
+            {
+                session.onError(e);
+                close();
+            }
+            catch (IOException e)
+            {
+                session.onError(e);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java 
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 8ea9976..50f8978 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -55,7 +55,7 @@ public class StreamManager implements StreamManagerMBean
      */
     public static RateLimiter getRateLimiter()
     {
-        double currentThroughput = ((double) 
DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * 1024 * 1024 / 
8 / 1000;
+        double currentThroughput = ((double) 
DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * 1024 * 1024;
         // if throughput is set to 0, throttling is disabled
         if (currentThroughput == 0)
             currentThroughput = Double.MAX_VALUE;
@@ -91,4 +91,9 @@ public class StreamManager implements StreamManagerMBean
 
         currentStreams.put(result.planId, result);
     }
+
+    public StreamResultFuture getStream(UUID planId)
+    {
+        return currentStreams.get(planId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java 
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 78d50ad..08ab014 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -139,7 +139,7 @@ public class StreamPlan
      */
     public StreamResultFuture execute()
     {
-        return StreamResultFuture.startStreamingAsync(planId, description, 
sessions.values());
+        return StreamResultFuture.init(planId, description, sessions.values());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 4fe180c..ac21352 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -53,7 +53,7 @@ public class StreamReceiveTask extends StreamTask
      *
      * @param sstable SSTable file received.
      */
-    public void receive(SSTableReader sstable)
+    public void received(SSTableReader sstable)
     {
         assert cfId.equals(sstable.metadata.cfId);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 84332bd..cc2432a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -17,7 +17,9 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.io.IOException;
 import java.net.InetAddress;
+import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -29,29 +31,34 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.utils.FBUtilities;
 
+
 /**
- * StreamResultFuture asynchronously returns the final {@link StreamState} of 
execution of {@link StreamPlan}.
+ * A future on the result ({@link StreamState}) of a streaming plan.
+ *
+ * In practice, this object also groups all the {@link StreamSession} for the 
streaming job
+ * involved. One StreamSession will be created for every peer involved and 
said session will
+ * handle every streaming (outgoing and incoming) to that peer for this job.
  * <p>
- * You can attach {@link StreamEventHandler} to this object to listen on 
{@link StreamEvent}s to track progress of the streaming.
+ * The future will return a result once every session is completed 
(successfully or not). If
+ * any session ended up with an error, the future will throw a StreamException.
+ * <p>
+ * You can attach {@link StreamEventHandler} to this object to listen on 
{@link StreamEvent}s to
+ * track progress of the streaming.
  */
 public final class StreamResultFuture extends AbstractFuture<StreamState>
 {
-    // Executor that establish the streaming connection. Once we're connected 
to the other end, the rest of the streaming
-    // is directly handled by the ConnectionHandler incoming and outgoing 
threads.
-    private static final DebuggableThreadPoolExecutor streamExecutor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
-                                                                               
                                             
FBUtilities.getAvailableProcessors());
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamResultFuture.class);
 
     public final UUID planId;
     public final String description;
     private final List<StreamEventHandler> eventListeners = 
Collections.synchronizedList(new ArrayList<StreamEventHandler>());
-    private final Set<UUID> ongoingSessions;
+
+    private final Map<InetAddress, StreamSession> ongoingSessions;
     private final Map<InetAddress, SessionInfo> sessionStates = new 
NonBlockingHashMap<>();
 
     /**
@@ -63,39 +70,63 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
      * @param description Stream description
      * @param numberOfSessions number of sessions to wait for complete
      */
-    private StreamResultFuture(UUID planId, String description, Set<UUID> 
sessions)
+    private StreamResultFuture(UUID planId, String description, 
Collection<StreamSession> sessions)
     {
         this.planId = planId;
         this.description = description;
-        this.ongoingSessions = sessions;
+        this.ongoingSessions = new HashMap<>(sessions.size());
+        for (StreamSession session : sessions)
+            this.ongoingSessions.put(session.peer, session);;
 
         // if there is no session to listen to, we immediately set result for 
returning
         if (sessions.isEmpty())
             set(getCurrentState());
     }
 
-    static StreamResultFuture startStreamingAsync(UUID planId, String 
description, Collection<StreamSession> sessions)
+    static StreamResultFuture init(UUID planId, String description, 
Collection<StreamSession> sessions)
     {
-        Set<UUID> sessionsIds = new HashSet<>(sessions.size());
-        for (StreamSession session : sessions)
-            sessionsIds.add(session.id);
-
-        StreamResultFuture future = new StreamResultFuture(planId, 
description, sessionsIds);
-
-        StreamManager.instance.register(future);
+        StreamResultFuture future = createAndRegister(planId, description, 
sessions);
 
         // start sessions
-        for (StreamSession session : sessions)
+        for (final StreamSession session : sessions)
         {
-            session.register(future);
-            // register to gossiper/FD to fail on node failure
-            Gossiper.instance.register(session);
-            
FailureDetector.instance.registerFailureDetectionEventListener(session);
-            streamExecutor.submit(session);
+            session.init(future);
+            session.start();
         }
+
         return future;
     }
 
+    public static StreamResultFuture initReceivingSide(UUID planId, String 
description, InetAddress from, Socket socket, int version)
+    {
+        final StreamSession session = new StreamSession(from);
+
+        // The main reason we create a StreamResultFuture on the receiving 
side is for JMX exposure.
+        StreamResultFuture future = createAndRegister(planId, description, 
Collections.singleton(session));
+
+        session.init(future);
+        session.start(socket, version);
+
+        return future;
+    }
+
+    private static StreamResultFuture createAndRegister(UUID planId, String 
description, Collection<StreamSession> sessions)
+    {
+        StreamResultFuture future = new StreamResultFuture(planId, 
description, sessions);
+        StreamManager.instance.register(future);
+        return future;
+    }
+
+    public void startStreaming(InetAddress from, Socket socket, int version) 
throws IOException
+    {
+        StreamSession session = ongoingSessions.get(from);
+        if (session == null)
+            throw new RuntimeException(String.format("Got connection from %s 
for stream session %s but no such session locally", from, planId));
+
+        session.handler.attachIncomingSocket(socket, version);
+        session.onInitializationComplete();
+    }
+
     public void addEventListener(StreamEventHandler listener)
     {
         Futures.addCallback(this, listener);
@@ -135,13 +166,12 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
 
     void handleSessionComplete(StreamSession session)
     {
-        Gossiper.instance.unregister(session);
-        
FailureDetector.instance.unregisterFailureDetectionEventListener(session);
+        logger.debug("Session with {} is complete", session.peer);
 
         SessionInfo sessionInfo = session.getSessionInfo();
         sessionStates.put(sessionInfo.peer, sessionInfo);
         fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
-        maybeComplete(session.id);
+        maybeComplete(session);
     }
 
     public void handleProgress(ProgressInfo progress)
@@ -157,9 +187,9 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
             listener.handleStreamEvent(event);
     }
 
-    private synchronized void maybeComplete(UUID sessionId)
+    private synchronized void maybeComplete(StreamSession session)
     {
-        ongoingSessions.remove(sessionId);
+        ongoingSessions.remove(session.peer);
         if (ongoingSessions.isEmpty())
         {
             StreamState finalState = getCurrentState();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 9c3dfaa..6a771a1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowPosition;
@@ -45,40 +46,79 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
- * StreamSession is the center of Cassandra Streaming API.
+ * Handles the streaming a one or more section of one of more sstables to and 
from a specific
+ * remote node.
  *
- * StreamSession on the both endpoints exchange messages and files until 
complete.
+ * Both this node and the remote one will create a similar symmetrical 
StreamSession. A streaming
+ * session has the following life-cycle:
  *
- * It is created through {@link StreamPlan} on the initiator node,
- * and also is created directly from connected socket on the other end when 
received init message.
+ * 1. Connections Initialization
  *
- * <p>
- * StreamSession goes through several stages:
- * <ol>
- *  <li>
- *    Init
- *    <p>StreamSession in one end send init message to the other end.</p>
- *  </li>
- *  <li>
- *    Prepare
- *    <p>StreamSession in both endpoints are created, so in this phase, they 
exchange
- *    request and summary messages to prepare receiving/streaming files in 
next phase.</p>
- *  </li>
- *  <li>
- *    Stream
- *    <p>StreamSessions in both ends stream and receive files.</p>
- *  </li>
- *  <li>
- *    Complete
- *    <p>Session completes if both endpoints completed by exchanging complete 
message.</p>
- *  </li>
- * </ol>
+ *   (a) A node (the initiator in the following) create a new StreamSession, 
initialize it (init())
+ *       and then start it (start()). Start will create a {@link 
ConnectionHandler} that will create
+ *       a connection to the remote node (the follower in the following) with 
whom to stream and send
+ *       a StreamInit message. This first connection will be the outgoing 
connection for the
+ *       initiator.
+ *   (b) Upon reception of that StreamInit message, the follower creates its 
own StreamSession,
+ *       initialize it and start it using start(Socket, int). This creates the 
follower
+ *       ConnectionHandler, which will use the just opened connection as 
incoming connection. It
+ *       will then connect back to the initiator and send its own StreamInit 
message on that new
+ *       connection. This new connection will be the outgoing connection for 
the follower.
+ *   (c) On receiving the follower StreamInit message, the initiator will 
record that new connection
+ *       as it's own incoming connection and call the Session 
onInitializationComplete() method to start
+ *       the streaming prepare phase (StreamResultFuture.startStreaming()).
+ *
+ * 2. Streaming preparation phase
+ *
+ *   (a) This phase is started when the initiator onInitializationComplete() 
method is called. This method sends a
+ *       PrepareMessage that includes what files/sections this node will 
stream to the follower
+ *       (stored in a StreamTranferTask, each column family has it's own 
transfer task) and what
+ *       the follower needs to stream back (StreamReceiveTask, same as above). 
If the initiator has
+ *       nothing to receive from the follower, it goes directly to its 
Streaming phase. Otherwise,
+ *       it waits for the follower PrepareMessage.
+ *   (b) Upon reception of the PrepareMessage, the follower records which 
files/sections it will receive
+ *       and send back its own PrepareMessage with a summary of the 
files/sections that will be sent to
+ *       the initiator (prepare()). After having sent that message, the 
follower goes to its Streamning
+ *       phase.
+ *   (c) When the initiator receives the follower PrepareMessage, it records 
which files/sections it will
+ *       receive and then goes to his own Streaming phase.
+ *
+ * 3. Streaming phase
+ *
+ *   (a) The streaming phase is started by each node (the sender in the 
follower, but note that each side
+ *       of the StreamSession may be sender for some of the files) involved by 
calling startStreamingFiles().
+ *       This will sequentially send a FileMessage for each file of each 
SteamTransferTask. Each FileMessage
+ *       consists of a FileMessageHeader that indicates which file is coming 
and then start streaming the
+ *       content for that file (StreamWriter in FileMessage.serialize()). When 
a file is fully sent, the
+ *       fileSent() method is called for that file. If all the files for a 
StreamTransferTask are sent
+ *       (StreamTransferTask.complete()), the task is marked complete 
(taskCompleted()).
+ *   (b) On the receiving side, a SSTable will be written for the incoming 
file (StreamReader in
+ *       FileMessage.deserialize()) and once the FileMessage is fully 
received, the file will be marked as
+ *       complete (received()). When all files for the StreamReceiveTask have 
been received, the sstables
+ *       are added to the CFS (and 2ndary index are built, 
StreamReceiveTask.complete()) and the task
+ *       is marked complete (taskCompleted())
+ *   (b) If during the streaming of a particular file an I/O error occurs on 
the receiving end of a stream
+ *       (FileMessage.deserialize), the node will retry the file (up to 
DatabaseDescriptor.getMaxStreamingRetries())
+ *       by sending a RetryMessage to the sender. On receiving a RetryMessage, 
the sender simply issue a new
+ *       FileMessage for that file.
+ *   (c) When all transfer and receive tasks for a session are complete, the 
move to the Completion phase
+ *       (maybeCompleted()).
+ *
+ * 4. Completion phase
+ *
+ *   (a) When a node has finished all transfer and receive task, it enter the 
completion phase (maybeCompleted()).
+ *       If it had already received a CompleteMessage from the other side (it 
is in the WAIT_COMPLETE state), that
+ *       session is done is is closed (closeSession()). Otherwise, the node 
switch to the WAIT_COMPLETE state and
+ *       send a CompleteMessage to the other side.
  */
-public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class StreamSession implements IEndpointStateChangeSubscriber, 
IFailureDetectionEventListener
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamSession.class);
 
-    public final UUID id = UUIDGen.getTimeUUID();
+    // Executor that establish the streaming connection. Once we're connected 
to the other end, the rest of the streaming
+    // is directly handled by the ConnectionHandler incoming and outgoing 
threads.
+    private static final DebuggableThreadPoolExecutor streamExecutor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
+                                                                               
                                             
FBUtilities.getAvailableProcessors());
     public final InetAddress peer;
 
     // should not be null when session is started
@@ -98,7 +138,7 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
 
     public static enum State
     {
-        INITIALIZING,
+        INITIALIZED,
         PREPARING,
         STREAMING,
         WAIT_COMPLETE,
@@ -106,7 +146,7 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
         FAILED,
     }
 
-    private volatile State state = State.INITIALIZING;
+    private volatile State state = State.INITIALIZED;
 
     /**
      * Create new streaming session with the peer.
@@ -120,19 +160,6 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
         this.metrics = StreamingMetrics.get(peer);
     }
 
-    /**
-     * Create streaming session from established connection.
-     *
-     * @param socket established connection
-     * @param protocolVersion Streaming protocol verison
-     */
-    public StreamSession(Socket socket, int protocolVersion)
-    {
-        this.peer = socket.getInetAddress();
-        this.handler = new ConnectionHandler(this, socket, protocolVersion);
-        this.metrics = StreamingMetrics.get(peer);
-    }
-
     public UUID planId()
     {
         return streamResult == null ? null : streamResult.planId;
@@ -143,25 +170,67 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
         return streamResult == null ? null : streamResult.description;
     }
 
-    public static StreamSession startReceivingStreamAsync(UUID planId, String 
description, Socket socket, int version)
-    {
-        StreamSession session = new StreamSession(socket, version);
-        StreamResultFuture.startStreamingAsync(planId, description, 
Collections.singleton(session));
-        return session;
-    }
-
     /**
-     * Bind this session to report to specific {@link StreamResultFuture}.
+     * Bind this session to report to specific {@link StreamResultFuture} and
+     * perform pre-streaming initialization.
      *
      * @param streamResult result to report to
      * @return this object for chaining
      */
-    public StreamSession register(StreamResultFuture streamResult)
+    public void init(StreamResultFuture streamResult)
     {
         this.streamResult = streamResult;
-        return this;
+
+        // register to gossiper/FD to fail on node failure
+        Gossiper.instance.register(this);
+        FailureDetector.instance.registerFailureDetectionEventListener(this);
+
     }
 
+    public void start()
+    {
+        if (requests.isEmpty() && transfers.isEmpty())
+        {
+            logger.debug("Session does not have any tasks.");
+            closeSession(State.COMPLETE);
+            return;
+        }
+
+        streamExecutor.execute(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    handler.initiate();
+                }
+                catch (IOException e)
+                {
+                    onError(e);
+                }
+            }
+        });
+    }
+
+    public void start(final Socket socket, final int version)
+    {
+        streamExecutor.execute(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    handler.initiateOnReceivingSide(socket, version);
+                }
+                catch (IOException e)
+                {
+                    onError(e);
+                }
+            }
+        });
+    }
+
+
     /**
      * Request data fetch task to this session.
      *
@@ -240,38 +309,17 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
         }
     }
 
-    /**
-     * Start this stream session.
-     */
-    public void run()
+    private void closeSession(State finalState)
     {
-        assert streamResult != null : "No result is associated with this 
session";
+        state(finalState);
 
-        try
-        {
-            if (handler.isConnected())
-            {
-                // if this session is created from remote...
-                handler.start();
-            }
-            else
-            {
-                if (requests.isEmpty() && transfers.isEmpty())
-                {
-                    logger.debug("Session does not have any tasks.");
-                    state(State.COMPLETE);
-                    streamResult.handleSessionComplete(this);
-                }
-                else
-                {
-                    handler.connect();
-                }
-            }
-        }
-        catch (IOException e)
-        {
-            onError(e);
-        }
+        // Note that we shouldn't block on this close because this method is 
called on the handler
+        // incoming thread (so we would deadlock).
+        handler.close();
+
+        Gossiper.instance.unregister(this);
+        FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+        streamResult.handleSessionComplete(this);
     }
 
     /**
@@ -312,7 +360,7 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
                 break;
 
             case FILE:
-                receive((FileMessage) message);
+                received((FileMessage) message);
                 break;
 
             case RETRY:
@@ -331,11 +379,9 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
     }
 
     /**
-     * Call back for connection success.
-     *
-     * When connected, session moves to preparing phase and sends prepare 
message.
+     * Call back when connection initialization is complete to start the 
prepare phase.
      */
-    public void onConnect()
+    public void onInitializationComplete()
     {
         logger.debug("Connected. Sending prepare...");
 
@@ -362,13 +408,11 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
      */
     public void onError(Throwable e)
     {
-        state(State.FAILED);
-
         logger.error("Streaming error occurred", e);
         // send session failure message
         handler.sendMessage(new SessionFailedMessage());
         // fail session
-        streamResult.handleSessionComplete(this);
+        closeSession(State.FAILED);
     }
 
     /**
@@ -376,7 +420,8 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
      */
     public void prepare(Collection<StreamRequest> requests, 
Collection<StreamSummary> summaries)
     {
-        logger.debug("Start preparing this session (" + requests.size() + " 
requests, " + summaries.size() + " columnfamilies receiving)");
+        logger.debug("Start preparing this session (" + requests.size() + " to 
send, " + summaries.size() + " to receive)");
+
         // prepare tasks
         state(State.PREPARING);
         for (StreamRequest request : requests)
@@ -418,11 +463,11 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
      *
      * @param message received file
      */
-    public void receive(FileMessage message)
+    public void received(FileMessage message)
     {
         StreamingMetrics.totalIncomingBytes.inc(message.header.size());
         metrics.incomingBytes.inc(message.header.size());
-        receivers.get(message.header.cfId).receive(message.sstable);
+        receivers.get(message.header.cfId).received(message.sstable);
     }
 
     public void progress(Descriptor desc, ProgressInfo.Direction direction, 
long bytes, long total)
@@ -450,9 +495,7 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
     {
         if (state == State.WAIT_COMPLETE)
         {
-            state(State.COMPLETE);
-            handler.close();
-            streamResult.handleSessionComplete(this);
+            closeSession(State.COMPLETE);
         }
         else
         {
@@ -465,8 +508,7 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
      */
     public synchronized void sessionFailed()
     {
-        handler.close();
-        streamResult.handleSessionComplete(this);
+        closeSession(State.FAILED);
     }
 
     public void doRetry(FileMessageHeader header, Throwable e)
@@ -529,8 +571,7 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
         if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
             return;
 
-        state(State.FAILED);
-        streamResult.handleSessionComplete(this);
+        closeSession(State.FAILED);
     }
 
     private boolean maybeCompleted()
@@ -540,9 +581,7 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
         {
             if (state == State.WAIT_COMPLETE)
             {
-                state(State.COMPLETE);
-                handler.close();
-                streamResult.handleSessionComplete(this);
+                closeSession(State.COMPLETE);
             }
             else
             {
@@ -554,7 +593,6 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
         return completed;
     }
 
-
     /**
      * Flushes matching column families from the given keyspace, or all 
columnFamilies
      * if the cf list is empty.
@@ -570,7 +608,7 @@ public class StreamSession implements Runnable, 
IEndpointStateChangeSubscriber,
 
     private void prepareReceiving(StreamSummary summary)
     {
-        logger.debug("prepare receiving " + summary);
+        logger.debug("Prepare for receiving " + summary);
         if (summary.files > 0)
             receivers.put(summary.cfId, new StreamReceiveTask(this, 
summary.cfId, summary.files, summary.totalSize));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
index fe05eac..464c35a 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
@@ -103,4 +103,10 @@ public class FileMessage extends StreamMessage
                                             sections,
                                             compressionInfo);
     }
+
+    @Override
+    public String toString()
+    {
+        return "FileMessage(" + sstable + ")";
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67ccdabf/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index d1e797c..685ad41 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -20,12 +20,14 @@ package org.apache.cassandra.streaming.messages;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
 
@@ -37,13 +39,19 @@ public class StreamInitMessage
 {
     public static IVersionedSerializer<StreamInitMessage> serializer = new 
StreamInitMessageSerializer();
 
+    public final InetAddress from;
     public final UUID planId;
     public final String description;
 
-    public StreamInitMessage(UUID planId, String description)
+    // Whether the sender of this message is the stream initiator
+    public final boolean sentByInitiator;
+
+    public StreamInitMessage(InetAddress from, UUID planId, String 
description, boolean sentByInitiator)
     {
+        this.from = from;
         this.planId = planId;
         this.description = description;
+        this.sentByInitiator = sentByInitiator;
     }
 
     /**
@@ -95,20 +103,27 @@ public class StreamInitMessage
     {
         public void serialize(StreamInitMessage message, DataOutput out, int 
version) throws IOException
         {
+            CompactEndpointSerializationHelper.serialize(message.from, out);
             UUIDSerializer.serializer.serialize(message.planId, out, 
MessagingService.current_version);
             out.writeUTF(message.description);
+            out.writeBoolean(message.sentByInitiator);
         }
 
         public StreamInitMessage deserialize(DataInput in, int version) throws 
IOException
         {
+            InetAddress from = 
CompactEndpointSerializationHelper.deserialize(in);
             UUID planId = UUIDSerializer.serializer.deserialize(in, 
MessagingService.current_version);
-            return new StreamInitMessage(planId, in.readUTF());
+            String description = in.readUTF();
+            boolean sentByInitiator = in.readBoolean();
+            return new StreamInitMessage(from, planId, description, 
sentByInitiator);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
         {
-            long size = 
UUIDSerializer.serializer.serializedSize(message.planId, 
MessagingService.current_version);
+            long size = 
CompactEndpointSerializationHelper.serializedSize(message.from);
+            size += UUIDSerializer.serializer.serializedSize(message.planId, 
MessagingService.current_version);
             size += TypeSizes.NATIVE.sizeof(message.description);
+            size += TypeSizes.NATIVE.sizeof(message.sentByInitiator);
             return size;
         }
     }

Reply via email to