Author: jbellis
Date: Tue Sep 14 22:25:53 2010
New Revision: 997119

URL: http://svn.apache.org/viewvc?rev=997119&view=rev
Log:
avoid exposing StreamContext outside the Session managers, and make the 
Sessions the handle for streaming work.  also removes unncessary extra 
Collections from the session objects.
patch by jbellis; reviewed by Nick Bailey for CASSANDRA-1504

Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
    
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
Tue Sep 14 22:25:53 2010
@@ -41,7 +41,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.AbstractCompactedRow;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.streaming.StreamContext;
 import org.apache.cassandra.streaming.StreamIn;
 import org.apache.cassandra.streaming.StreamOut;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
@@ -538,9 +537,9 @@ public class AntiEntropyService
                 {
                     protected void runMayThrow() throws Exception
                     {
-                        StreamContext context = new 
StreamContext(request.endpoint);
-                        StreamOut.transferSSTables(context, request.cf.left, 
sstables, ranges);
-                        StreamOutSession.remove(context);
+                        StreamOutSession session = 
StreamOutSession.create(request.endpoint);
+                        StreamOut.transferSSTables(session, request.cf.left, 
sstables, ranges);
+                        session.close();
                     }
                 });
                 // request ranges from the remote node

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java 
Tue Sep 14 22:25:53 2010
@@ -38,13 +38,13 @@ class FileStatusHandler
 {
     private static Logger logger = 
LoggerFactory.getLogger(FileStatusHandler.class);
 
-    public static void onStatusChange(StreamContext context, PendingFile 
pendingFile, FileStatus streamStatus) throws IOException
+    public static void onStatusChange(StreamInSession session, PendingFile 
pendingFile, FileStatus streamStatus) throws IOException
     {
         if (FileStatus.Action.STREAM == streamStatus.getAction())
         {
             // file needs to be restreamed
-            logger.warn("Streaming of file {} from {} failed: requesting a 
retry.", pendingFile, context);
-            
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
context.host);
+            logger.warn("Streaming of file {} from {} failed: requesting a 
retry.", pendingFile, session);
+            
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
session.getHost());
             return;
         }
         assert FileStatus.Action.DELETE == streamStatus.getAction() :
@@ -54,8 +54,8 @@ class FileStatusHandler
 
         // send a StreamStatus message telling the source node it can delete 
this file
         if (logger.isDebugEnabled())
-            logger.debug("Sending a streaming finished message for {} to {}", 
pendingFile, context);
-        
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
context.host);
+            logger.debug("Sending a streaming finished message for {} to {}", 
pendingFile, session);
+        
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
session.getHost());
     }
 
     public static void addSSTable(PendingFile pendingFile)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Tue Sep 14 22:25:53 2010
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 import java.io.*;
+import java.util.Arrays;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,14 +32,15 @@ import org.apache.cassandra.utils.Pair;
 
 public class IncomingStreamReader
 {
-    private static Logger logger = 
LoggerFactory.getLogger(IncomingStreamReader.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(IncomingStreamReader.class);
+
     private PendingFile pendingFile;
     private PendingFile lastFile;
     private FileStatus streamStatus;
-    private SocketChannel socketChannel;
-    private StreamContext context;
+    private final SocketChannel socketChannel;
     // indicates an transfer initiated by the source, as opposed to one 
requested by the recipient
-    private boolean initiatedTransfer;
+    private final boolean initiatedTransfer;
+    private final StreamInSession session;
 
     public IncomingStreamReader(StreamHeader header, SocketChannel 
socketChannel) throws IOException
     {
@@ -49,15 +51,15 @@ public class IncomingStreamReader
         // lastFile has the old context, which was registered in the manager.
         lastFile = header.getStreamFile();
         initiatedTransfer = header.initiatedTransfer;
-        context = new StreamContext(remoteAddress.getAddress(), 
header.getSessionId());
-        StreamInSession.activeStreams.put(context, pendingFile);
         assert pendingFile != null;
+        session = StreamInSession.get(remoteAddress.getAddress(), 
header.getSessionId());
+        session.addActiveStream(pendingFile);
         // For transfers setup the status and for replies to requests, prepare 
the list
         // of available files to request.
         if (initiatedTransfer)
             streamStatus = new FileStatus(lastFile.getFilename(), 
header.getSessionId());
         else if (header.getPendingFiles() != null)
-            
StreamInSession.get(context).addFilesToRequest(header.getPendingFiles());
+            session.addFilesToRequest(header.getPendingFiles());
     }
 
     public void read() throws IOException
@@ -89,7 +91,7 @@ public class IncomingStreamReader
             if (initiatedTransfer)
                 handleFileStatus(FileStatus.Action.STREAM);
             else
-                StreamIn.requestFile(context, lastFile);
+                session.requestFile(lastFile);
 
             /* Delete the orphaned file. */
             FileUtils.deleteWithConfirm(new File(pendingFile.getFilename()));
@@ -98,7 +100,7 @@ public class IncomingStreamReader
         finally
         {
             fc.close();
-            StreamInSession.activeStreams.remove(context, pendingFile);
+            session.removeActiveStream(pendingFile);
         }
 
         if (logger.isDebugEnabled())
@@ -108,13 +110,13 @@ public class IncomingStreamReader
         else
         {
             FileStatusHandler.addSSTable(pendingFile);
-            StreamInSession.get(context).finishAndRequestNext(lastFile);
+            session.finishAndRequestNext(lastFile);
         }
     }
 
     private void handleFileStatus(FileStatus.Action action) throws IOException
     {
         streamStatus.setAction(action);
-        FileStatusHandler.onStatusChange(context, pendingFile, streamStatus);
+        FileStatusHandler.onStatusChange(session, pendingFile, streamStatus);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Tue 
Sep 14 22:25:53 2010
@@ -51,24 +51,11 @@ public class StreamIn
 
         if (logger.isDebugEnabled())
             logger.debug("Requesting from {} ranges {}", source, 
StringUtils.join(ranges, ", "));
-        StreamContext context = new StreamContext(source);
-        StreamInSession.get(context);
-        Message message = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName, 
context.sessionId).makeMessage();
+        StreamInSession session = StreamInSession.create(source);
+        Message message = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName, 
session.getSessionId()).makeMessage();
         MessagingService.instance.sendOneWay(message, source);
     }
 
-    /**
-     * Request for transferring a single file. This happens subsequent of 
#requestRanges() being called.
-     * @param file Pending File that needs to be transferred
-     */
-    public static void requestFile(StreamContext context, PendingFile file)
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("Requesting file {} from source {}", 
file.getFilename(), context.host);
-        Message message = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), file, 
context.sessionId).makeMessage();
-        MessagingService.instance.sendOneWay(message, context.host);
-    }
-
     /** Translates remote files to local files by creating a local sstable per 
remote sstable. */
     public static PendingFile getContextMapping(PendingFile remote) throws 
IOException
     {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
Tue Sep 14 22:25:53 2010
@@ -23,24 +23,23 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** each context gets its own StreamInSession. So there may be >1 
StreamInManager per host */
+/** each context gets its own StreamInSession. So there may be >1 Session per 
host */
 public class StreamInSession
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamInSession.class);
 
-    private static ConcurrentMap<StreamContext, StreamInSession> 
streamManagers = new ConcurrentHashMap<StreamContext, StreamInSession>(0);
-    public static final Multimap<StreamContext, PendingFile> activeStreams = 
Multimaps.synchronizedMultimap(HashMultimap.<StreamContext, 
PendingFile>create());
-    public static final Multimap<InetAddress, StreamContext> sourceHosts = 
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, 
StreamContext>create());
-    
+    private static ConcurrentMap<StreamContext, StreamInSession> sessions = 
new NonBlockingHashMap<StreamContext, StreamInSession>();
+    private final Set<PendingFile> activeStreams = new HashSet<PendingFile>();
+
     private final List<PendingFile> pendingFiles = new 
ArrayList<PendingFile>();
     private final StreamContext context;
 
@@ -49,24 +48,44 @@ public class StreamInSession
         this.context = context;
     }
 
-    public synchronized static StreamInSession get(StreamContext context)
+    public static StreamInSession create(InetAddress host)
+    {
+        StreamContext context = new StreamContext(host);
+        StreamInSession session = new StreamInSession(context);
+        sessions.put(context, session);
+        return session;
+    }
+
+    public static StreamInSession get(InetAddress host, long sessionId)
     {
-        StreamInSession session = streamManagers.get(context);
+        StreamContext context = new StreamContext(host, sessionId);
+
+        StreamInSession session = sessions.get(context);
         if (session == null)
         {
             StreamInSession possibleNew = new StreamInSession(context);
-            if ((session = streamManagers.putIfAbsent(context, possibleNew)) 
== null)
+            if ((session = sessions.putIfAbsent(context, possibleNew)) == null)
             {
                 session = possibleNew;
-                sourceHosts.put(context.host, context);
             }
         }
         return session;
     }
 
-    public void addFilesToRequest(List<PendingFile> pendingFiles)
+    // FIXME hack for "initiated" streams.  replace w/ integration w/ 
pendingfiles
+    public void addActiveStream(PendingFile file)
+    {
+        activeStreams.add(file);
+    }
+
+    public void removeActiveStream(PendingFile file)
+    {
+        activeStreams.remove(file);
+    }
+
+    public void addFilesToRequest(List<PendingFile> files)
     {
-        for(PendingFile file : pendingFiles)
+        for(PendingFile file : files)
         {
             if(logger.isDebugEnabled())
                 logger.debug("Adding file {} to Stream Request queue", 
file.getFilename());
@@ -82,41 +101,61 @@ public class StreamInSession
     {
         pendingFiles.remove(lastFile);
         if (pendingFiles.size() > 0)
-            StreamIn.requestFile(context, pendingFiles.get(0));
+            requestFile(pendingFiles.get(0));
         else
         {
             if (StorageService.instance.isBootstrapMode())
-                StorageService.instance.removeBootstrapSource(context.host, 
lastFile.desc.ksname);
+                StorageService.instance.removeBootstrapSource(getHost(), 
lastFile.desc.ksname);
             remove();
         }
     }
     
     public void remove()
     {
-        if (streamManagers.containsKey(context))
-            streamManagers.remove(context);
-        sourceHosts.remove(context.host, context);
+        sessions.remove(context);
+    }
+
+    public void requestFile(PendingFile file)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Requesting file {} from source {}", 
file.getFilename(), getHost());
+        Message message = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), file, 
getSessionId()).makeMessage();
+        MessagingService.instance.sendOneWay(message, getHost());
+    }
+
+    public long getSessionId()
+    {
+        return context.sessionId;
+    }
+
+    public InetAddress getHost()
+    {
+        return context.host;
     }
 
     /** query method to determine which hosts are streaming to this node. */
-    public static Set<StreamContext> getSources()
+    public static Set<InetAddress> getSources()
     {
-        HashSet<StreamContext> set = new HashSet<StreamContext>();
-        set.addAll(streamManagers.keySet());
-        set.addAll(activeStreams.keySet());
+        HashSet<InetAddress> set = new HashSet<InetAddress>();
+        for (StreamInSession session : sessions.values())
+        {
+            set.add(session.getHost());
+        }
         return set;
     }
 
     /** query the status of incoming files. */
     public static List<PendingFile> getIncomingFiles(InetAddress host)
     {
-        // avoid returning null.
         List<PendingFile> list = new ArrayList<PendingFile>();
-        for (StreamContext context : sourceHosts.get(host))
+        for (Map.Entry<StreamContext, StreamInSession> entry : 
sessions.entrySet())
         {
-            if (streamManagers.containsKey(context))
-                list.addAll(streamManagers.get(context).pendingFiles);
-            list.addAll(activeStreams.get(context));
+            if (entry.getKey().host.equals(host))
+            {
+                StreamInSession session = entry.getValue();
+                list.addAll(session.pendingFiles);
+                list.addAll(session.activeStreams);
+            }
         }
         return list;
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Tue 
Sep 14 22:25:53 2010
@@ -66,17 +66,16 @@ public class StreamOut
     {
         assert ranges.size() > 0;
         
-        StreamContext context = new StreamContext(target);
         // this is so that this target shows up as a destination while 
anticompaction is happening.
-        StreamOutSession.get(context);
+        StreamOutSession session = StreamOutSession.create(target);
 
-        logger.info("Beginning transfer process to {} for ranges {}", context, 
StringUtils.join(ranges, ", "));
+        logger.info("Beginning transfer process to {} for ranges {}", target, 
StringUtils.join(ranges, ", "));
 
         try
         {
             Table table = flushSSTable(tableName);
             // send the matching portion of every sstable in the keyspace
-            transferSSTables(context, tableName, table.getAllSSTables(), 
ranges);
+            transferSSTables(session, tableName, table.getAllSSTables(), 
ranges);
         }
         catch (IOException e)
         {
@@ -84,7 +83,7 @@ public class StreamOut
         }
         finally
         {
-            StreamOutSession.remove(context);
+            session.close();
         }
         if (callback != null)
             callback.run();
@@ -120,17 +119,17 @@ public class StreamOut
     /**
      * Split out files for all tables on disk locally for each range and then 
stream them to the target endpoint.
     */
-    public static void transferRangesForRequest(StreamContext context, String 
tableName, Collection<Range> ranges, Runnable callback)
+    public static void transferRangesForRequest(StreamOutSession session, 
String tableName, Collection<Range> ranges, Runnable callback)
     {
         assert ranges.size() > 0;
 
-        logger.info("Beginning transfer process to {} for ranges {}", context, 
StringUtils.join(ranges, ", "));
+        logger.info("Beginning transfer process to {} for ranges {}", 
session.getHost(), StringUtils.join(ranges, ", "));
 
         try
         {
             Table table = flushSSTable(tableName);
             // send the matching portion of every sstable in the keyspace
-            transferSSTablesForRequest(context, tableName, 
table.getAllSSTables(), ranges);
+            transferSSTablesForRequest(session, tableName, 
table.getAllSSTables(), ranges);
         }
         catch (IOException e)
         {
@@ -144,21 +143,21 @@ public class StreamOut
     /**
      * Transfers matching portions of a group of sstables from a single table 
to the target endpoint.
      */
-    public static void transferSSTables(StreamContext context, String table, 
Collection<SSTableReader> sstables, Collection<Range> ranges) throws IOException
+    public static void transferSSTables(StreamOutSession session, String 
table, Collection<SSTableReader> sstables, Collection<Range> ranges) throws 
IOException
     {
         List<PendingFile> pending = createPendingFiles(sstables, ranges);
 
         if (pending.size() > 0)
         {
-            StreamHeader header = new StreamHeader(context.sessionId, 
pending.get(0), true);
-            StreamOutSession.get(context).addFilesToStream(pending);
+            StreamHeader header = new StreamHeader(session.getSessionId(), 
pending.get(0), true);
+            session.addFilesToStream(pending);
 
-            logger.info("Streaming file {} to {}", header.getStreamFile(), 
context.host);
-            MessagingService.instance.stream(header, context.host);
+            logger.info("Streaming file {} to {}", header.getStreamFile(), 
session.getHost());
+            MessagingService.instance.stream(header, session.getHost());
 
-            logger.info("Waiting for transfer to {} to complete", context);
-            StreamOutSession.get(context).waitForStreamCompletion();
-            logger.info("Done with transfer to {}", context);
+            logger.info("Waiting for transfer to {} to complete", 
session.getHost());
+            session.waitForStreamCompletion();
+            logger.info("Done with transfer to {}", session.getHost());
         }
     }
 
@@ -166,27 +165,27 @@ public class StreamOut
      * Transfers the first file for matching portions of a group of sstables 
and appends a list of other files
      * to the header for the requesting destination to take control of the 
rest of the transfers
      */
-    private static void transferSSTablesForRequest(StreamContext context, 
String table, Collection<SSTableReader> sstables, Collection<Range> ranges) 
throws IOException
+    private static void transferSSTablesForRequest(StreamOutSession session, 
String table, Collection<SSTableReader> sstables, Collection<Range> ranges) 
throws IOException
     {
         List<PendingFile> pending = createPendingFiles(sstables, ranges);
         if (pending.size() > 0)
         {
-            StreamHeader header = new StreamHeader(context.sessionId, 
pending.get(0), pending, false);
+            StreamHeader header = new StreamHeader(session.getSessionId(), 
pending.get(0), pending, false);
             // In case this happens to be a re-request due to some error 
condition on the destination side
-            if (StreamOutSession.getPendingFiles(context).size() == 0)
-                StreamOutSession.get(context).addFilesToStream(pending);
+            if (session.getFiles().isEmpty())
+                session.addFilesToStream(pending);
 
-            logger.info("Streaming file {} to {}", header.getStreamFile(), 
context.host);
-            MessagingService.instance.stream(header, context.host);
-            
StreamOutSession.get(context).removePending(header.getStreamFile());
+            logger.info("Streaming file {} to {}", header.getStreamFile(), 
session.getHost());
+            MessagingService.instance.stream(header, session.getHost());
+            session.removePending(header.getStreamFile());
         }
         else
         {
-            FileStatus status = new FileStatus("", context.sessionId);
+            FileStatus status = new FileStatus("", session.getSessionId());
             status.setAction(FileStatus.Action.EMPTY);
             Message message = status.makeStreamStatusMessage();
             message.setHeader(StreamOut.TABLE_NAME, table.getBytes());
-            MessagingService.instance.sendOneWay(message, context.host);
+            MessagingService.instance.sendOneWay(message, session.getHost());
         }
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java 
Tue Sep 14 22:25:53 2010
@@ -20,90 +20,47 @@ package org.apache.cassandra.streaming;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.SimpleCondition;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
  * This class manages the streaming of multiple files one after the other.
 */
 public class StreamOutSession
 {   
-    private static Logger logger = LoggerFactory.getLogger( 
StreamOutSession.class );
+    private static final Logger logger = LoggerFactory.getLogger( 
StreamOutSession.class );
         
-    // one host may have multiple stream contexts (think of them as sessions). 
each context gets its own manager.
-    private static ConcurrentMap<StreamContext, StreamOutSession> 
streamManagers = new ConcurrentHashMap<StreamContext, StreamOutSession>();
-    public static final Multimap<InetAddress, StreamContext> destHosts = 
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, 
StreamContext>create());
-
-    public static StreamOutSession get(StreamContext context)
-    {
-        StreamOutSession session = streamManagers.get(context);
-        if (session == null)
-        {
-            StreamOutSession possibleNew = new StreamOutSession(context);
-            if ((session = streamManagers.putIfAbsent(context, possibleNew)) 
== null)
-            {
-                session = possibleNew;
-                destHosts.put(context.host, context);
-            }
-        }
-        return session;
-    }
-    
-    public static void remove(StreamContext context)
+    // one host may have multiple stream sessions.
+    private static final ConcurrentMap<StreamContext, StreamOutSession> 
streams = new NonBlockingHashMap<StreamContext, StreamOutSession>();
+
+    public static StreamOutSession create(InetAddress host)
     {
-        if (streamManagers.containsKey(context) && 
streamManagers.get(context).files.size() == 0)
-        {
-            streamManagers.remove(context);
-            destHosts.remove(context.host, context);
-        }
+        return create(host, System.nanoTime());
     }
 
-    public static Set<InetAddress> getDestinations()
+    public static StreamOutSession create(InetAddress host, long sessionId)
     {
-        // the results of streamManagers.keySet() isn't serializable, so 
create a new set.
-        Set<InetAddress> hosts = new HashSet<InetAddress>();
-        hosts.addAll(destHosts.keySet());
-        return hosts;
+        StreamContext context = new StreamContext(host, sessionId);
+        StreamOutSession session = new StreamOutSession(context);
+        streams.put(context, session);
+        return session;
     }
-    
-    /** 
-     * this method exists so that we don't have to call StreamOutManager.get() 
which has a nasty side-effect of 
-     * indicating that we are streaming to a particular host.
-     **/     
-    public static List<PendingFile> getPendingFiles(StreamContext context)
+
+    public static StreamOutSession get(InetAddress host, long sessionId)
     {
-        List<PendingFile> list = new ArrayList<PendingFile>();
-        StreamOutSession session = streamManagers.get(context);
-        if (session != null)
-            list.addAll(session.getFiles());
-        return list;
+        return streams.get(new StreamContext(host, sessionId));
     }
 
-    public static List<PendingFile> getOutgoingFiles(InetAddress host)
+    public void close()
     {
-        List<PendingFile> list = new ArrayList<PendingFile>();
-        for(StreamContext context : destHosts.get(host))
-        {
-            list.addAll(getPendingFiles(context));
-        }
-        return list;
+        streams.remove(context);
     }
 
     // we need sequential and random access to the files. hence, the map and 
the list.
@@ -117,6 +74,16 @@ public class StreamOutSession
     {
         this.context = context;
     }
+
+    public InetAddress getHost()
+    {
+        return context.host;
+    }
+
+    public long getSessionId()
+    {
+        return context.sessionId;
+    }
     
     public void addFilesToStream(List<PendingFile> pendingFiles)
     {
@@ -142,7 +109,7 @@ public class StreamOutSession
     {
         if (logger.isDebugEnabled())
             logger.debug("Streaming {} ...", pf);
-        MessagingService.instance.stream(new StreamHeader(context.sessionId, 
pf, true), context.host);
+        MessagingService.instance.stream(new StreamHeader(getSessionId(), pf, 
true), getHost());
     }
 
     public void finishAndStartNext(String pfname) throws IOException
@@ -150,23 +117,25 @@ public class StreamOutSession
         PendingFile pf = fileMap.remove(pfname);
         files.remove(pf);
 
-        if (files.size() > 0)
-            streamFile(files.get(0));
-        else
+        if (files.isEmpty())
         {
             if (logger.isDebugEnabled())
-                logger.debug("Signalling that streaming is done for {} session 
{}", context.host, context.sessionId);
-            remove(context);
+                logger.debug("Signalling that streaming is done for {} session 
{}", getHost(), getSessionId());
+            close();
             condition.signalAll();
         }
+        else
+        {
+            streamFile(files.get(0));
+        }
     }
 
     public void removePending(PendingFile pf)
     {
         files.remove(pf);
         fileMap.remove(pf.getFilename());
-        if (files.size() == 0)
-            remove(context);
+        if (files.isEmpty())
+            close();
     }
 
     public void waitForStreamCompletion()
@@ -185,4 +154,25 @@ public class StreamOutSession
     {
         return Collections.unmodifiableList(files);
     }
+
+    public static Set<InetAddress> getDestinations()
+    {
+        Set<InetAddress> hosts = new HashSet<InetAddress>();
+        for (StreamOutSession session : streams.values())
+        {
+            hosts.add(session.getHost());
+        }
+        return hosts;
+    }
+
+    public static List<PendingFile> getOutgoingFiles(InetAddress host)
+    {
+        List<PendingFile> list = new ArrayList<PendingFile>();
+        for (Map.Entry<StreamContext, StreamOutSession> entry : 
streams.entrySet())
+        {
+            if (entry.getKey().host.equals(host))
+                list.addAll(entry.getValue().getFiles());
+        }
+        return list;
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
 Tue Sep 14 22:25:53 2010
@@ -54,15 +54,16 @@ public class StreamRequestVerbHandler im
 
             if (srm.file != null)
             {
-                // single file request.
+                // single file re-request.
                 StreamHeader header = new StreamHeader(srm.sessionId, 
srm.file, false);
                 MessagingService.instance.stream(header, message.getFrom());
-                StreamOutSession.get(new StreamContext(message.getFrom(), 
srm.sessionId)).removePending(srm.file);
+                StreamOutSession.get(message.getFrom(), 
srm.sessionId).removePending(srm.file);
             }
             else
             {
                 // range request.
-                StreamOut.transferRangesForRequest(new 
StreamContext(message.getFrom(), srm.sessionId), srm.table, srm.ranges, null);
+                StreamOutSession session = 
StreamOutSession.create(message.getFrom(), srm.sessionId);
+                StreamOut.transferRangesForRequest(session, srm.table, 
srm.ranges, null);
             }
         }
         catch (IOException ex)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
 Tue Sep 14 22:25:53 2010
@@ -45,20 +45,20 @@ public class StreamStatusVerbHandler imp
         try
         {
             FileStatus streamStatus = FileStatus.serializer().deserialize(new 
DataInputStream(bufIn));
-            StreamContext context = new StreamContext(message.getFrom(), 
streamStatus.getSessionId());
+            StreamOutSession session = StreamOutSession.get(message.getFrom(), 
streamStatus.getSessionId());
 
             switch (streamStatus.getAction())
             {
                 case DELETE:
-                    
StreamOutSession.get(context).finishAndStartNext(streamStatus.getFile());
+                    session.finishAndStartNext(streamStatus.getFile());
                     break;
                 case STREAM:
                     logger.warn("Need to re-stream file {} to {}", 
streamStatus.getFile(), message.getFrom());
-                    
StreamOutSession.get(context).retry(streamStatus.getFile());
+                    session.retry(streamStatus.getFile());
                     break;
                 case EMPTY:
                     logger.error("Did not find matching ranges on {}", 
message.getFrom());
-                    StreamInSession.get(context).remove();
+                    StreamInSession.get(message.getFrom(), 
streamStatus.getSessionId()).remove();
                     if (StorageService.instance.isBootstrapMode())
                         
StorageService.instance.removeBootstrapSource(message.getFrom(), new 
String(message.getHeader(StreamOut.TABLE_NAME)));
                     break;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java 
Tue Sep 14 22:25:53 2010
@@ -54,10 +54,10 @@ public class StreamingService implements
     {
         StringBuilder sb = new StringBuilder();
         sb.append("Receiving from:\n");
-        for (StreamContext source : StreamInSession.getSources())
+        for (InetAddress source : StreamInSession.getSources())
         {
-            sb.append(String.format(" %s:\n", source.host.getHostAddress()));
-            for (PendingFile pf : 
StreamInSession.getIncomingFiles(source.host))
+            sb.append(String.format(" %s:\n", source.getHostAddress()));
+            for (PendingFile pf : StreamInSession.getIncomingFiles(source))
             {
                 sb.append(String.format("  %s\n", pf.toString()));
             }
@@ -99,13 +99,7 @@ public class StreamingService implements
     /** hosts sending incoming streams */
     public Set<InetAddress> getStreamSources()
     {
-        Set<InetAddress> sources = new HashSet<InetAddress>();
-
-        for(StreamContext context : StreamInSession.getSources())
-        {
-            sources.add(context.host);
-        }
-        return sources;
+        return StreamInSession.getSources();
     }
 
     /** details about incoming streams. */

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
 Tue Sep 14 22:25:53 2010
@@ -34,8 +34,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.SSTableUtils;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamContext;
-import org.apache.cassandra.streaming.StreamOut;
 import org.apache.cassandra.utils.FBUtilities;
 
 import org.junit.BeforeClass;
@@ -68,7 +66,7 @@ public class StreamingTransferTest exten
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), 
p.getToken("key".getBytes())));
         ranges.add(new Range(p.getToken("key2".getBytes()), 
p.getMinimumToken()));
-        StreamOut.transferSSTables(new StreamContext(LOCAL), tablename, 
Arrays.asList(sstable), ranges);
+        StreamOut.transferSSTables(StreamOutSession.create(LOCAL), tablename, 
Arrays.asList(sstable), ranges);
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = 
Table.open(tablename).getColumnFamilyStore(cfname);
@@ -108,7 +106,7 @@ public class StreamingTransferTest exten
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), 
p.getToken("transfer1".getBytes())));
         ranges.add(new Range(p.getToken("test2".getBytes()), 
p.getMinimumToken()));
-        StreamOut.transferSSTables(new StreamContext(LOCAL), tablename, 
Arrays.asList(sstable, sstable2), ranges);
+        StreamOut.transferSSTables(StreamOutSession.create(LOCAL), tablename, 
Arrays.asList(sstable, sstable2), ranges);
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = 
Table.open(tablename).getColumnFamilyStore(cfname);


Reply via email to