Author: jbellis
Date: Tue Sep 14 22:26:00 2010
New Revision: 997120

URL: http://svn.apache.org/viewvc?rev=997120&view=rev
Log:
replace StreamContext with Pair
patch by jbellis; reviewed by Nick Bailey for CASSANDRA-1504


Removed:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContext.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=997120&r1=997119&r2=997120&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
Tue Sep 14 22:26:00 2010
@@ -20,7 +20,6 @@ package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.cassandra.net.Message;
@@ -28,6 +27,7 @@ import org.apache.cassandra.net.Messagin
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.apache.cassandra.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,20 +37,20 @@ public class StreamInSession
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamInSession.class);
 
-    private static ConcurrentMap<StreamContext, StreamInSession> sessions = 
new NonBlockingHashMap<StreamContext, StreamInSession>();
+    private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> 
sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>();
     private final Set<PendingFile> activeStreams = new HashSet<PendingFile>();
 
     private final List<PendingFile> pendingFiles = new 
ArrayList<PendingFile>();
-    private final StreamContext context;
+    private final Pair<InetAddress, Long> context;
 
-    private StreamInSession(StreamContext context)
+    private StreamInSession(Pair<InetAddress, Long> context)
     {
         this.context = context;
     }
 
     public static StreamInSession create(InetAddress host)
     {
-        StreamContext context = new StreamContext(host);
+        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, 
System.nanoTime());
         StreamInSession session = new StreamInSession(context);
         sessions.put(context, session);
         return session;
@@ -58,7 +58,7 @@ public class StreamInSession
 
     public static StreamInSession get(InetAddress host, long sessionId)
     {
-        StreamContext context = new StreamContext(host, sessionId);
+        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, 
sessionId);
 
         StreamInSession session = sessions.get(context);
         if (session == null)
@@ -125,12 +125,12 @@ public class StreamInSession
 
     public long getSessionId()
     {
-        return context.sessionId;
+        return context.right;
     }
 
     public InetAddress getHost()
     {
-        return context.host;
+        return context.left;
     }
 
     /** query method to determine which hosts are streaming to this node. */
@@ -148,9 +148,9 @@ public class StreamInSession
     public static List<PendingFile> getIncomingFiles(InetAddress host)
     {
         List<PendingFile> list = new ArrayList<PendingFile>();
-        for (Map.Entry<StreamContext, StreamInSession> entry : 
sessions.entrySet())
+        for (Map.Entry<Pair<InetAddress, Long>, StreamInSession> entry : 
sessions.entrySet())
         {
-            if (entry.getKey().host.equals(host))
+            if (entry.getKey().left.equals(host))
             {
                 StreamInSession session = entry.getValue();
                 list.addAll(session.pendingFiles);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=997120&r1=997119&r2=997120&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java 
Tue Sep 14 22:26:00 2010
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -38,7 +39,7 @@ public class StreamOutSession
     private static final Logger logger = LoggerFactory.getLogger( 
StreamOutSession.class );
         
     // one host may have multiple stream sessions.
-    private static final ConcurrentMap<StreamContext, StreamOutSession> 
streams = new NonBlockingHashMap<StreamContext, StreamOutSession>();
+    private static final ConcurrentMap<Pair<InetAddress, Long>, 
StreamOutSession> streams = new NonBlockingHashMap<Pair<InetAddress, Long>, 
StreamOutSession>();
 
     public static StreamOutSession create(InetAddress host)
     {
@@ -47,7 +48,7 @@ public class StreamOutSession
 
     public static StreamOutSession create(InetAddress host, long sessionId)
     {
-        StreamContext context = new StreamContext(host, sessionId);
+        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, 
sessionId);
         StreamOutSession session = new StreamOutSession(context);
         streams.put(context, session);
         return session;
@@ -55,7 +56,7 @@ public class StreamOutSession
 
     public static StreamOutSession get(InetAddress host, long sessionId)
     {
-        return streams.get(new StreamContext(host, sessionId));
+        return streams.get(new Pair<InetAddress, Long>(host, sessionId));
     }
 
     public void close()
@@ -67,22 +68,22 @@ public class StreamOutSession
     private final List<PendingFile> files = new ArrayList<PendingFile>();
     private final Map<String, PendingFile> fileMap = new HashMap<String, 
PendingFile>();
     
-    private final StreamContext context;
+    private final Pair<InetAddress, Long> context;
     private final SimpleCondition condition = new SimpleCondition();
     
-    private StreamOutSession(StreamContext context)
+    private StreamOutSession(Pair<InetAddress, Long> context)
     {
         this.context = context;
     }
 
     public InetAddress getHost()
     {
-        return context.host;
+        return context.left;
     }
 
     public long getSessionId()
     {
-        return context.sessionId;
+        return context.right;
     }
     
     public void addFilesToStream(List<PendingFile> pendingFiles)
@@ -168,9 +169,9 @@ public class StreamOutSession
     public static List<PendingFile> getOutgoingFiles(InetAddress host)
     {
         List<PendingFile> list = new ArrayList<PendingFile>();
-        for (Map.Entry<StreamContext, StreamOutSession> entry : 
streams.entrySet())
+        for (Map.Entry<Pair<InetAddress, Long>, StreamOutSession> entry : 
streams.entrySet())
         {
-            if (entry.getKey().host.equals(host))
+            if (entry.getKey().left.equals(host))
                 list.addAll(entry.getValue().getFiles());
         }
         return list;


Reply via email to