Author: jbellis
Date: Tue Sep 14 22:26:00 2010
New Revision: 997120
URL: http://svn.apache.org/viewvc?rev=997120&view=rev
Log:
replace StreamContext with Pair
patch by jbellis; reviewed by Nick Bailey for CASSANDRA-1504
Removed:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContext.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=997120&r1=997119&r2=997120&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
Tue Sep 14 22:26:00 2010
@@ -20,7 +20,6 @@ package org.apache.cassandra.streaming;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.cassandra.net.Message;
@@ -28,6 +27,7 @@ import org.apache.cassandra.net.Messagin
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,20 +37,20 @@ public class StreamInSession
{
private static final Logger logger =
LoggerFactory.getLogger(StreamInSession.class);
- private static ConcurrentMap<StreamContext, StreamInSession> sessions =
new NonBlockingHashMap<StreamContext, StreamInSession>();
+ private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession>
sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>();
private final Set<PendingFile> activeStreams = new HashSet<PendingFile>();
private final List<PendingFile> pendingFiles = new
ArrayList<PendingFile>();
- private final StreamContext context;
+ private final Pair<InetAddress, Long> context;
- private StreamInSession(StreamContext context)
+ private StreamInSession(Pair<InetAddress, Long> context)
{
this.context = context;
}
public static StreamInSession create(InetAddress host)
{
- StreamContext context = new StreamContext(host);
+ Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host,
System.nanoTime());
StreamInSession session = new StreamInSession(context);
sessions.put(context, session);
return session;
@@ -58,7 +58,7 @@ public class StreamInSession
public static StreamInSession get(InetAddress host, long sessionId)
{
- StreamContext context = new StreamContext(host, sessionId);
+ Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host,
sessionId);
StreamInSession session = sessions.get(context);
if (session == null)
@@ -125,12 +125,12 @@ public class StreamInSession
public long getSessionId()
{
- return context.sessionId;
+ return context.right;
}
public InetAddress getHost()
{
- return context.host;
+ return context.left;
}
/** query method to determine which hosts are streaming to this node. */
@@ -148,9 +148,9 @@ public class StreamInSession
public static List<PendingFile> getIncomingFiles(InetAddress host)
{
List<PendingFile> list = new ArrayList<PendingFile>();
- for (Map.Entry<StreamContext, StreamInSession> entry :
sessions.entrySet())
+ for (Map.Entry<Pair<InetAddress, Long>, StreamInSession> entry :
sessions.entrySet())
{
- if (entry.getKey().host.equals(host))
+ if (entry.getKey().left.equals(host))
{
StreamInSession session = entry.getValue();
list.addAll(session.pendingFiles);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=997120&r1=997119&r2=997120&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Tue Sep 14 22:26:00 2010
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -38,7 +39,7 @@ public class StreamOutSession
private static final Logger logger = LoggerFactory.getLogger(
StreamOutSession.class );
// one host may have multiple stream sessions.
- private static final ConcurrentMap<StreamContext, StreamOutSession>
streams = new NonBlockingHashMap<StreamContext, StreamOutSession>();
+ private static final ConcurrentMap<Pair<InetAddress, Long>,
StreamOutSession> streams = new NonBlockingHashMap<Pair<InetAddress, Long>,
StreamOutSession>();
public static StreamOutSession create(InetAddress host)
{
@@ -47,7 +48,7 @@ public class StreamOutSession
public static StreamOutSession create(InetAddress host, long sessionId)
{
- StreamContext context = new StreamContext(host, sessionId);
+ Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host,
sessionId);
StreamOutSession session = new StreamOutSession(context);
streams.put(context, session);
return session;
@@ -55,7 +56,7 @@ public class StreamOutSession
public static StreamOutSession get(InetAddress host, long sessionId)
{
- return streams.get(new StreamContext(host, sessionId));
+ return streams.get(new Pair<InetAddress, Long>(host, sessionId));
}
public void close()
@@ -67,22 +68,22 @@ public class StreamOutSession
private final List<PendingFile> files = new ArrayList<PendingFile>();
private final Map<String, PendingFile> fileMap = new HashMap<String,
PendingFile>();
- private final StreamContext context;
+ private final Pair<InetAddress, Long> context;
private final SimpleCondition condition = new SimpleCondition();
- private StreamOutSession(StreamContext context)
+ private StreamOutSession(Pair<InetAddress, Long> context)
{
this.context = context;
}
public InetAddress getHost()
{
- return context.host;
+ return context.left;
}
public long getSessionId()
{
- return context.sessionId;
+ return context.right;
}
public void addFilesToStream(List<PendingFile> pendingFiles)
@@ -168,9 +169,9 @@ public class StreamOutSession
public static List<PendingFile> getOutgoingFiles(InetAddress host)
{
List<PendingFile> list = new ArrayList<PendingFile>();
- for (Map.Entry<StreamContext, StreamOutSession> entry :
streams.entrySet())
+ for (Map.Entry<Pair<InetAddress, Long>, StreamOutSession> entry :
streams.entrySet())
{
- if (entry.getKey().host.equals(host))
+ if (entry.getKey().left.equals(host))
list.addAll(entry.getValue().getFiles());
}
return list;