Author: jbellis
Date: Tue Sep 14 22:25:31 2010
New Revision: 997118

URL: http://svn.apache.org/viewvc?rev=997118&view=rev
Log:
rename Manager -> Session
patch by jbellis for CASSANDRA-1504

Added:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
      - copied, changed from r997082, 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
      - copied, changed from r997082, 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
Tue Sep 14 22:25:31 2010
@@ -48,7 +48,7 @@ import org.apache.cassandra.net.CompactE
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamOutManager;
+import org.apache.cassandra.streaming.StreamOutSession;
 import org.apache.cassandra.utils.*;
 
 /**
@@ -540,7 +540,7 @@ public class AntiEntropyService
                     {
                         StreamContext context = new 
StreamContext(request.endpoint);
                         StreamOut.transferSSTables(context, request.cf.left, 
sstables, ranges);
-                        StreamOutManager.remove(context);
+                        StreamOutSession.remove(context);
                     }
                 });
                 // request ranges from the remote node

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Tue Sep 14 22:25:31 2010
@@ -50,14 +50,14 @@ public class IncomingStreamReader
         lastFile = header.getStreamFile();
         initiatedTransfer = header.initiatedTransfer;
         context = new StreamContext(remoteAddress.getAddress(), 
header.getSessionId());
-        StreamInManager.activeStreams.put(context, pendingFile);
+        StreamInSession.activeStreams.put(context, pendingFile);
         assert pendingFile != null;
         // For transfers setup the status and for replies to requests, prepare 
the list
         // of available files to request.
         if (initiatedTransfer)
             streamStatus = new FileStatus(lastFile.getFilename(), 
header.getSessionId());
         else if (header.getPendingFiles() != null)
-            
StreamInManager.get(context).addFilesToRequest(header.getPendingFiles());
+            
StreamInSession.get(context).addFilesToRequest(header.getPendingFiles());
     }
 
     public void read() throws IOException
@@ -98,7 +98,7 @@ public class IncomingStreamReader
         finally
         {
             fc.close();
-            StreamInManager.activeStreams.remove(context, pendingFile);
+            StreamInSession.activeStreams.remove(context, pendingFile);
         }
 
         if (logger.isDebugEnabled())
@@ -108,7 +108,7 @@ public class IncomingStreamReader
         else
         {
             FileStatusHandler.addSSTable(pendingFile);
-            StreamInManager.get(context).finishAndRequestNext(lastFile);
+            StreamInSession.get(context).finishAndRequestNext(lastFile);
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Tue 
Sep 14 22:25:31 2010
@@ -52,7 +52,7 @@ public class StreamIn
         if (logger.isDebugEnabled())
             logger.debug("Requesting from {} ranges {}", source, 
StringUtils.join(ranges, ", "));
         StreamContext context = new StreamContext(source);
-        StreamInManager.get(context);
+        StreamInSession.get(context);
         Message message = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName, 
context.sessionId).makeMessage();
         MessagingService.instance.sendOneWay(message, source);
     }

Copied: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
(from r997082, 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java)
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java&r1=997082&r2=997118&rev=997118&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
Tue Sep 14 22:25:31 2010
@@ -32,36 +32,36 @@ import org.apache.cassandra.service.Stor
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** each context gets its own StreamInManager. So there may be >1 
StreamInManager per host */
-public class StreamInManager
+/** each context gets its own StreamInSession. So there may be >1 
StreamInManager per host */
+public class StreamInSession
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(StreamInManager.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamInSession.class);
 
-    private static ConcurrentMap<StreamContext, StreamInManager> 
streamManagers = new ConcurrentHashMap<StreamContext, StreamInManager>(0);
+    private static ConcurrentMap<StreamContext, StreamInSession> 
streamManagers = new ConcurrentHashMap<StreamContext, StreamInSession>(0);
     public static final Multimap<StreamContext, PendingFile> activeStreams = 
Multimaps.synchronizedMultimap(HashMultimap.<StreamContext, 
PendingFile>create());
     public static final Multimap<InetAddress, StreamContext> sourceHosts = 
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, 
StreamContext>create());
     
     private final List<PendingFile> pendingFiles = new 
ArrayList<PendingFile>();
     private final StreamContext context;
 
-    private StreamInManager(StreamContext context)
+    private StreamInSession(StreamContext context)
     {
         this.context = context;
     }
 
-    public synchronized static StreamInManager get(StreamContext context)
+    public synchronized static StreamInSession get(StreamContext context)
     {
-        StreamInManager manager = streamManagers.get(context);
-        if (manager == null)
+        StreamInSession session = streamManagers.get(context);
+        if (session == null)
         {
-            StreamInManager possibleNew = new StreamInManager(context);
-            if ((manager = streamManagers.putIfAbsent(context, possibleNew)) 
== null)
+            StreamInSession possibleNew = new StreamInSession(context);
+            if ((session = streamManagers.putIfAbsent(context, possibleNew)) 
== null)
             {
-                manager = possibleNew;
+                session = possibleNew;
                 sourceHosts.put(context.host, context);
             }
         }
-        return manager;
+        return session;
     }
 
     public void addFilesToRequest(List<PendingFile> pendingFiles)

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Tue 
Sep 14 22:25:31 2010
@@ -68,7 +68,7 @@ public class StreamOut
         
         StreamContext context = new StreamContext(target);
         // this is so that this target shows up as a destination while 
anticompaction is happening.
-        StreamOutManager.get(context);
+        StreamOutSession.get(context);
 
         logger.info("Beginning transfer process to {} for ranges {}", context, 
StringUtils.join(ranges, ", "));
 
@@ -84,7 +84,7 @@ public class StreamOut
         }
         finally
         {
-            StreamOutManager.remove(context);
+            StreamOutSession.remove(context);
         }
         if (callback != null)
             callback.run();
@@ -151,13 +151,13 @@ public class StreamOut
         if (pending.size() > 0)
         {
             StreamHeader header = new StreamHeader(context.sessionId, 
pending.get(0), true);
-            StreamOutManager.get(context).addFilesToStream(pending);
+            StreamOutSession.get(context).addFilesToStream(pending);
 
             logger.info("Streaming file {} to {}", header.getStreamFile(), 
context.host);
             MessagingService.instance.stream(header, context.host);
 
             logger.info("Waiting for transfer to {} to complete", context);
-            StreamOutManager.get(context).waitForStreamCompletion();
+            StreamOutSession.get(context).waitForStreamCompletion();
             logger.info("Done with transfer to {}", context);
         }
     }
@@ -173,12 +173,12 @@ public class StreamOut
         {
             StreamHeader header = new StreamHeader(context.sessionId, 
pending.get(0), pending, false);
             // In case this happens to be a re-request due to some error 
condition on the destination side
-            if (StreamOutManager.getPendingFiles(context).size() == 0)
-                StreamOutManager.get(context).addFilesToStream(pending);
+            if (StreamOutSession.getPendingFiles(context).size() == 0)
+                StreamOutSession.get(context).addFilesToStream(pending);
 
             logger.info("Streaming file {} to {}", header.getStreamFile(), 
context.host);
             MessagingService.instance.stream(header, context.host);
-            
StreamOutManager.get(context).removePending(header.getStreamFile());
+            
StreamOutSession.get(context).removePending(header.getStreamFile());
         }
         else
         {

Copied: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java 
(from r997082, 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java)
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java&r1=997082&r2=997118&rev=997118&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java 
Tue Sep 14 22:25:31 2010
@@ -43,27 +43,27 @@ import com.google.common.collect.Multima
 /**
  * This class manages the streaming of multiple files one after the other.
 */
-public class StreamOutManager
+public class StreamOutSession
 {   
-    private static Logger logger = LoggerFactory.getLogger( 
StreamOutManager.class );
+    private static Logger logger = LoggerFactory.getLogger( 
StreamOutSession.class );
         
     // one host may have multiple stream contexts (think of them as sessions). 
each context gets its own manager.
-    private static ConcurrentMap<StreamContext, StreamOutManager> 
streamManagers = new ConcurrentHashMap<StreamContext, StreamOutManager>();
+    private static ConcurrentMap<StreamContext, StreamOutSession> 
streamManagers = new ConcurrentHashMap<StreamContext, StreamOutSession>();
     public static final Multimap<InetAddress, StreamContext> destHosts = 
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, 
StreamContext>create());
 
-    public static StreamOutManager get(StreamContext context)
+    public static StreamOutSession get(StreamContext context)
     {
-        StreamOutManager manager = streamManagers.get(context);
-        if (manager == null)
+        StreamOutSession session = streamManagers.get(context);
+        if (session == null)
         {
-            StreamOutManager possibleNew = new StreamOutManager(context);
-            if ((manager = streamManagers.putIfAbsent(context, possibleNew)) 
== null)
+            StreamOutSession possibleNew = new StreamOutSession(context);
+            if ((session = streamManagers.putIfAbsent(context, possibleNew)) 
== null)
             {
-                manager = possibleNew;
+                session = possibleNew;
                 destHosts.put(context.host, context);
             }
         }
-        return manager;
+        return session;
     }
     
     public static void remove(StreamContext context)
@@ -90,9 +90,9 @@ public class StreamOutManager
     public static List<PendingFile> getPendingFiles(StreamContext context)
     {
         List<PendingFile> list = new ArrayList<PendingFile>();
-        StreamOutManager manager = streamManagers.get(context);
-        if (manager != null)
-            list.addAll(manager.getFiles());
+        StreamOutSession session = streamManagers.get(context);
+        if (session != null)
+            list.addAll(session.getFiles());
         return list;
     }
 
@@ -113,7 +113,7 @@ public class StreamOutManager
     private final StreamContext context;
     private final SimpleCondition condition = new SimpleCondition();
     
-    private StreamOutManager(StreamContext context)
+    private StreamOutSession(StreamContext context)
     {
         this.context = context;
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
 Tue Sep 14 22:25:31 2010
@@ -57,7 +57,7 @@ public class StreamRequestVerbHandler im
                 // single file request.
                 StreamHeader header = new StreamHeader(srm.sessionId, 
srm.file, false);
                 MessagingService.instance.stream(header, message.getFrom());
-                StreamOutManager.get(new StreamContext(message.getFrom(), 
srm.sessionId)).removePending(srm.file);
+                StreamOutSession.get(new StreamContext(message.getFrom(), 
srm.sessionId)).removePending(srm.file);
             }
             else
             {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
 Tue Sep 14 22:25:31 2010
@@ -50,15 +50,15 @@ public class StreamStatusVerbHandler imp
             switch (streamStatus.getAction())
             {
                 case DELETE:
-                    
StreamOutManager.get(context).finishAndStartNext(streamStatus.getFile());
+                    
StreamOutSession.get(context).finishAndStartNext(streamStatus.getFile());
                     break;
                 case STREAM:
                     logger.warn("Need to re-stream file {} to {}", 
streamStatus.getFile(), message.getFrom());
-                    
StreamOutManager.get(context).retry(streamStatus.getFile());
+                    
StreamOutSession.get(context).retry(streamStatus.getFile());
                     break;
                 case EMPTY:
                     logger.error("Did not find matching ranges on {}", 
message.getFrom());
-                    StreamInManager.get(context).remove();
+                    StreamInSession.get(context).remove();
                     if (StorageService.instance.isBootstrapMode())
                         
StorageService.instance.removeBootstrapSource(message.getFrom(), new 
String(message.getHeader(StreamOut.TABLE_NAME)));
                     break;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java 
Tue Sep 14 22:25:31 2010
@@ -54,19 +54,19 @@ public class StreamingService implements
     {
         StringBuilder sb = new StringBuilder();
         sb.append("Receiving from:\n");
-        for (StreamContext source : StreamInManager.getSources())
+        for (StreamContext source : StreamInSession.getSources())
         {
             sb.append(String.format(" %s:\n", source.host.getHostAddress()));
-            for (PendingFile pf : 
StreamInManager.getIncomingFiles(source.host))
+            for (PendingFile pf : 
StreamInSession.getIncomingFiles(source.host))
             {
                 sb.append(String.format("  %s\n", pf.toString()));
             }
         }
         sb.append("Sending to:\n");
-        for (InetAddress dest : StreamOutManager.getDestinations())
+        for (InetAddress dest : StreamOutSession.getDestinations())
         {
             sb.append(String.format(" %s:\n", dest.getHostAddress()));
-            for (PendingFile pf : StreamOutManager.getOutgoingFiles(dest))
+            for (PendingFile pf : StreamOutSession.getOutgoingFiles(dest))
             {
                 sb.append(String.format("  %s\n", pf.toString()));
             }
@@ -77,7 +77,7 @@ public class StreamingService implements
     /** hosts receiving outgoing streams. */
     public Set<InetAddress> getStreamDestinations()
     {
-        return StreamOutManager.getDestinations();
+        return StreamOutSession.getDestinations();
     }
 
     /** outgoing streams */
@@ -91,7 +91,7 @@ public class StreamingService implements
         if (!existingDestinations.contains(dest))
             return files;
         
-        for (PendingFile f : StreamOutManager.getOutgoingFiles(dest))
+        for (PendingFile f : StreamOutSession.getOutgoingFiles(dest))
             files.add(String.format("%s", f.toString()));
         return files;
     }
@@ -101,7 +101,7 @@ public class StreamingService implements
     {
         Set<InetAddress> sources = new HashSet<InetAddress>();
 
-        for(StreamContext context : StreamInManager.getSources())
+        for(StreamContext context : StreamInSession.getSources())
         {
             sources.add(context.host);
         }
@@ -112,7 +112,7 @@ public class StreamingService implements
     public List<String> getIncomingFiles(String host) throws IOException
     {
         List<String> files = new ArrayList<String>();
-        for (PendingFile pf : 
StreamInManager.getIncomingFiles(InetAddress.getByName(host)))
+        for (PendingFile pf : 
StreamInSession.getIncomingFiles(InetAddress.getByName(host)))
         {
             files.add(String.format("%s: %s", pf.desc.ksname, pf.toString()));
         }


Reply via email to