Author: jbellis
Date: Tue Sep 14 22:25:53 2010
New Revision: 997119
URL: http://svn.apache.org/viewvc?rev=997119&view=rev
Log:
avoid exposing StreamContext outside the Session managers, and make the
Sessions the handle for streaming work. also removes unncessary extra
Collections from the session objects.
patch by jbellis; reviewed by Nick Bailey for CASSANDRA-1504
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Tue Sep 14 22:25:53 2010
@@ -41,7 +41,6 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.AbstractCompactedRow;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.streaming.StreamContext;
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
@@ -538,9 +537,9 @@ public class AntiEntropyService
{
protected void runMayThrow() throws Exception
{
- StreamContext context = new
StreamContext(request.endpoint);
- StreamOut.transferSSTables(context, request.cf.left,
sstables, ranges);
- StreamOutSession.remove(context);
+ StreamOutSession session =
StreamOutSession.create(request.endpoint);
+ StreamOut.transferSSTables(session, request.cf.left,
sstables, ranges);
+ session.close();
}
});
// request ranges from the remote node
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
Tue Sep 14 22:25:53 2010
@@ -38,13 +38,13 @@ class FileStatusHandler
{
private static Logger logger =
LoggerFactory.getLogger(FileStatusHandler.class);
- public static void onStatusChange(StreamContext context, PendingFile
pendingFile, FileStatus streamStatus) throws IOException
+ public static void onStatusChange(StreamInSession session, PendingFile
pendingFile, FileStatus streamStatus) throws IOException
{
if (FileStatus.Action.STREAM == streamStatus.getAction())
{
// file needs to be restreamed
- logger.warn("Streaming of file {} from {} failed: requesting a
retry.", pendingFile, context);
-
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
context.host);
+ logger.warn("Streaming of file {} from {} failed: requesting a
retry.", pendingFile, session);
+
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
session.getHost());
return;
}
assert FileStatus.Action.DELETE == streamStatus.getAction() :
@@ -54,8 +54,8 @@ class FileStatusHandler
// send a StreamStatus message telling the source node it can delete
this file
if (logger.isDebugEnabled())
- logger.debug("Sending a streaming finished message for {} to {}",
pendingFile, context);
-
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
context.host);
+ logger.debug("Sending a streaming finished message for {} to {}",
pendingFile, session);
+
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
session.getHost());
}
public static void addSSTable(PendingFile pendingFile)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Tue Sep 14 22:25:53 2010
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.io.*;
+import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,14 +32,15 @@ import org.apache.cassandra.utils.Pair;
public class IncomingStreamReader
{
- private static Logger logger =
LoggerFactory.getLogger(IncomingStreamReader.class);
+ private static final Logger logger =
LoggerFactory.getLogger(IncomingStreamReader.class);
+
private PendingFile pendingFile;
private PendingFile lastFile;
private FileStatus streamStatus;
- private SocketChannel socketChannel;
- private StreamContext context;
+ private final SocketChannel socketChannel;
// indicates an transfer initiated by the source, as opposed to one
requested by the recipient
- private boolean initiatedTransfer;
+ private final boolean initiatedTransfer;
+ private final StreamInSession session;
public IncomingStreamReader(StreamHeader header, SocketChannel
socketChannel) throws IOException
{
@@ -49,15 +51,15 @@ public class IncomingStreamReader
// lastFile has the old context, which was registered in the manager.
lastFile = header.getStreamFile();
initiatedTransfer = header.initiatedTransfer;
- context = new StreamContext(remoteAddress.getAddress(),
header.getSessionId());
- StreamInSession.activeStreams.put(context, pendingFile);
assert pendingFile != null;
+ session = StreamInSession.get(remoteAddress.getAddress(),
header.getSessionId());
+ session.addActiveStream(pendingFile);
// For transfers setup the status and for replies to requests, prepare
the list
// of available files to request.
if (initiatedTransfer)
streamStatus = new FileStatus(lastFile.getFilename(),
header.getSessionId());
else if (header.getPendingFiles() != null)
-
StreamInSession.get(context).addFilesToRequest(header.getPendingFiles());
+ session.addFilesToRequest(header.getPendingFiles());
}
public void read() throws IOException
@@ -89,7 +91,7 @@ public class IncomingStreamReader
if (initiatedTransfer)
handleFileStatus(FileStatus.Action.STREAM);
else
- StreamIn.requestFile(context, lastFile);
+ session.requestFile(lastFile);
/* Delete the orphaned file. */
FileUtils.deleteWithConfirm(new File(pendingFile.getFilename()));
@@ -98,7 +100,7 @@ public class IncomingStreamReader
finally
{
fc.close();
- StreamInSession.activeStreams.remove(context, pendingFile);
+ session.removeActiveStream(pendingFile);
}
if (logger.isDebugEnabled())
@@ -108,13 +110,13 @@ public class IncomingStreamReader
else
{
FileStatusHandler.addSSTable(pendingFile);
- StreamInSession.get(context).finishAndRequestNext(lastFile);
+ session.finishAndRequestNext(lastFile);
}
}
private void handleFileStatus(FileStatus.Action action) throws IOException
{
streamStatus.setAction(action);
- FileStatusHandler.onStatusChange(context, pendingFile, streamStatus);
+ FileStatusHandler.onStatusChange(session, pendingFile, streamStatus);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Tue
Sep 14 22:25:53 2010
@@ -51,24 +51,11 @@ public class StreamIn
if (logger.isDebugEnabled())
logger.debug("Requesting from {} ranges {}", source,
StringUtils.join(ranges, ", "));
- StreamContext context = new StreamContext(source);
- StreamInSession.get(context);
- Message message = new
StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName,
context.sessionId).makeMessage();
+ StreamInSession session = StreamInSession.create(source);
+ Message message = new
StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName,
session.getSessionId()).makeMessage();
MessagingService.instance.sendOneWay(message, source);
}
- /**
- * Request for transferring a single file. This happens subsequent of
#requestRanges() being called.
- * @param file Pending File that needs to be transferred
- */
- public static void requestFile(StreamContext context, PendingFile file)
- {
- if (logger.isDebugEnabled())
- logger.debug("Requesting file {} from source {}",
file.getFilename(), context.host);
- Message message = new
StreamRequestMessage(FBUtilities.getLocalAddress(), file,
context.sessionId).makeMessage();
- MessagingService.instance.sendOneWay(message, context.host);
- }
-
/** Translates remote files to local files by creating a local sstable per
remote sstable. */
public static PendingFile getContextMapping(PendingFile remote) throws
IOException
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
Tue Sep 14 22:25:53 2010
@@ -23,24 +23,23 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** each context gets its own StreamInSession. So there may be >1
StreamInManager per host */
+/** each context gets its own StreamInSession. So there may be >1 Session per
host */
public class StreamInSession
{
private static final Logger logger =
LoggerFactory.getLogger(StreamInSession.class);
- private static ConcurrentMap<StreamContext, StreamInSession>
streamManagers = new ConcurrentHashMap<StreamContext, StreamInSession>(0);
- public static final Multimap<StreamContext, PendingFile> activeStreams =
Multimaps.synchronizedMultimap(HashMultimap.<StreamContext,
PendingFile>create());
- public static final Multimap<InetAddress, StreamContext> sourceHosts =
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress,
StreamContext>create());
-
+ private static ConcurrentMap<StreamContext, StreamInSession> sessions =
new NonBlockingHashMap<StreamContext, StreamInSession>();
+ private final Set<PendingFile> activeStreams = new HashSet<PendingFile>();
+
private final List<PendingFile> pendingFiles = new
ArrayList<PendingFile>();
private final StreamContext context;
@@ -49,24 +48,44 @@ public class StreamInSession
this.context = context;
}
- public synchronized static StreamInSession get(StreamContext context)
+ public static StreamInSession create(InetAddress host)
+ {
+ StreamContext context = new StreamContext(host);
+ StreamInSession session = new StreamInSession(context);
+ sessions.put(context, session);
+ return session;
+ }
+
+ public static StreamInSession get(InetAddress host, long sessionId)
{
- StreamInSession session = streamManagers.get(context);
+ StreamContext context = new StreamContext(host, sessionId);
+
+ StreamInSession session = sessions.get(context);
if (session == null)
{
StreamInSession possibleNew = new StreamInSession(context);
- if ((session = streamManagers.putIfAbsent(context, possibleNew))
== null)
+ if ((session = sessions.putIfAbsent(context, possibleNew)) == null)
{
session = possibleNew;
- sourceHosts.put(context.host, context);
}
}
return session;
}
- public void addFilesToRequest(List<PendingFile> pendingFiles)
+ // FIXME hack for "initiated" streams. replace w/ integration w/
pendingfiles
+ public void addActiveStream(PendingFile file)
+ {
+ activeStreams.add(file);
+ }
+
+ public void removeActiveStream(PendingFile file)
+ {
+ activeStreams.remove(file);
+ }
+
+ public void addFilesToRequest(List<PendingFile> files)
{
- for(PendingFile file : pendingFiles)
+ for(PendingFile file : files)
{
if(logger.isDebugEnabled())
logger.debug("Adding file {} to Stream Request queue",
file.getFilename());
@@ -82,41 +101,61 @@ public class StreamInSession
{
pendingFiles.remove(lastFile);
if (pendingFiles.size() > 0)
- StreamIn.requestFile(context, pendingFiles.get(0));
+ requestFile(pendingFiles.get(0));
else
{
if (StorageService.instance.isBootstrapMode())
- StorageService.instance.removeBootstrapSource(context.host,
lastFile.desc.ksname);
+ StorageService.instance.removeBootstrapSource(getHost(),
lastFile.desc.ksname);
remove();
}
}
public void remove()
{
- if (streamManagers.containsKey(context))
- streamManagers.remove(context);
- sourceHosts.remove(context.host, context);
+ sessions.remove(context);
+ }
+
+ public void requestFile(PendingFile file)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Requesting file {} from source {}",
file.getFilename(), getHost());
+ Message message = new
StreamRequestMessage(FBUtilities.getLocalAddress(), file,
getSessionId()).makeMessage();
+ MessagingService.instance.sendOneWay(message, getHost());
+ }
+
+ public long getSessionId()
+ {
+ return context.sessionId;
+ }
+
+ public InetAddress getHost()
+ {
+ return context.host;
}
/** query method to determine which hosts are streaming to this node. */
- public static Set<StreamContext> getSources()
+ public static Set<InetAddress> getSources()
{
- HashSet<StreamContext> set = new HashSet<StreamContext>();
- set.addAll(streamManagers.keySet());
- set.addAll(activeStreams.keySet());
+ HashSet<InetAddress> set = new HashSet<InetAddress>();
+ for (StreamInSession session : sessions.values())
+ {
+ set.add(session.getHost());
+ }
return set;
}
/** query the status of incoming files. */
public static List<PendingFile> getIncomingFiles(InetAddress host)
{
- // avoid returning null.
List<PendingFile> list = new ArrayList<PendingFile>();
- for (StreamContext context : sourceHosts.get(host))
+ for (Map.Entry<StreamContext, StreamInSession> entry :
sessions.entrySet())
{
- if (streamManagers.containsKey(context))
- list.addAll(streamManagers.get(context).pendingFiles);
- list.addAll(activeStreams.get(context));
+ if (entry.getKey().host.equals(host))
+ {
+ StreamInSession session = entry.getValue();
+ list.addAll(session.pendingFiles);
+ list.addAll(session.activeStreams);
+ }
}
return list;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Tue
Sep 14 22:25:53 2010
@@ -66,17 +66,16 @@ public class StreamOut
{
assert ranges.size() > 0;
- StreamContext context = new StreamContext(target);
// this is so that this target shows up as a destination while
anticompaction is happening.
- StreamOutSession.get(context);
+ StreamOutSession session = StreamOutSession.create(target);
- logger.info("Beginning transfer process to {} for ranges {}", context,
StringUtils.join(ranges, ", "));
+ logger.info("Beginning transfer process to {} for ranges {}", target,
StringUtils.join(ranges, ", "));
try
{
Table table = flushSSTable(tableName);
// send the matching portion of every sstable in the keyspace
- transferSSTables(context, tableName, table.getAllSSTables(),
ranges);
+ transferSSTables(session, tableName, table.getAllSSTables(),
ranges);
}
catch (IOException e)
{
@@ -84,7 +83,7 @@ public class StreamOut
}
finally
{
- StreamOutSession.remove(context);
+ session.close();
}
if (callback != null)
callback.run();
@@ -120,17 +119,17 @@ public class StreamOut
/**
* Split out files for all tables on disk locally for each range and then
stream them to the target endpoint.
*/
- public static void transferRangesForRequest(StreamContext context, String
tableName, Collection<Range> ranges, Runnable callback)
+ public static void transferRangesForRequest(StreamOutSession session,
String tableName, Collection<Range> ranges, Runnable callback)
{
assert ranges.size() > 0;
- logger.info("Beginning transfer process to {} for ranges {}", context,
StringUtils.join(ranges, ", "));
+ logger.info("Beginning transfer process to {} for ranges {}",
session.getHost(), StringUtils.join(ranges, ", "));
try
{
Table table = flushSSTable(tableName);
// send the matching portion of every sstable in the keyspace
- transferSSTablesForRequest(context, tableName,
table.getAllSSTables(), ranges);
+ transferSSTablesForRequest(session, tableName,
table.getAllSSTables(), ranges);
}
catch (IOException e)
{
@@ -144,21 +143,21 @@ public class StreamOut
/**
* Transfers matching portions of a group of sstables from a single table
to the target endpoint.
*/
- public static void transferSSTables(StreamContext context, String table,
Collection<SSTableReader> sstables, Collection<Range> ranges) throws IOException
+ public static void transferSSTables(StreamOutSession session, String
table, Collection<SSTableReader> sstables, Collection<Range> ranges) throws
IOException
{
List<PendingFile> pending = createPendingFiles(sstables, ranges);
if (pending.size() > 0)
{
- StreamHeader header = new StreamHeader(context.sessionId,
pending.get(0), true);
- StreamOutSession.get(context).addFilesToStream(pending);
+ StreamHeader header = new StreamHeader(session.getSessionId(),
pending.get(0), true);
+ session.addFilesToStream(pending);
- logger.info("Streaming file {} to {}", header.getStreamFile(),
context.host);
- MessagingService.instance.stream(header, context.host);
+ logger.info("Streaming file {} to {}", header.getStreamFile(),
session.getHost());
+ MessagingService.instance.stream(header, session.getHost());
- logger.info("Waiting for transfer to {} to complete", context);
- StreamOutSession.get(context).waitForStreamCompletion();
- logger.info("Done with transfer to {}", context);
+ logger.info("Waiting for transfer to {} to complete",
session.getHost());
+ session.waitForStreamCompletion();
+ logger.info("Done with transfer to {}", session.getHost());
}
}
@@ -166,27 +165,27 @@ public class StreamOut
* Transfers the first file for matching portions of a group of sstables
and appends a list of other files
* to the header for the requesting destination to take control of the
rest of the transfers
*/
- private static void transferSSTablesForRequest(StreamContext context,
String table, Collection<SSTableReader> sstables, Collection<Range> ranges)
throws IOException
+ private static void transferSSTablesForRequest(StreamOutSession session,
String table, Collection<SSTableReader> sstables, Collection<Range> ranges)
throws IOException
{
List<PendingFile> pending = createPendingFiles(sstables, ranges);
if (pending.size() > 0)
{
- StreamHeader header = new StreamHeader(context.sessionId,
pending.get(0), pending, false);
+ StreamHeader header = new StreamHeader(session.getSessionId(),
pending.get(0), pending, false);
// In case this happens to be a re-request due to some error
condition on the destination side
- if (StreamOutSession.getPendingFiles(context).size() == 0)
- StreamOutSession.get(context).addFilesToStream(pending);
+ if (session.getFiles().isEmpty())
+ session.addFilesToStream(pending);
- logger.info("Streaming file {} to {}", header.getStreamFile(),
context.host);
- MessagingService.instance.stream(header, context.host);
-
StreamOutSession.get(context).removePending(header.getStreamFile());
+ logger.info("Streaming file {} to {}", header.getStreamFile(),
session.getHost());
+ MessagingService.instance.stream(header, session.getHost());
+ session.removePending(header.getStreamFile());
}
else
{
- FileStatus status = new FileStatus("", context.sessionId);
+ FileStatus status = new FileStatus("", session.getSessionId());
status.setAction(FileStatus.Action.EMPTY);
Message message = status.makeStreamStatusMessage();
message.setHeader(StreamOut.TABLE_NAME, table.getBytes());
- MessagingService.instance.sendOneWay(message, context.host);
+ MessagingService.instance.sendOneWay(message, session.getHost());
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Tue Sep 14 22:25:53 2010
@@ -20,90 +20,47 @@ package org.apache.cassandra.streaming;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.*;
import java.util.concurrent.ConcurrentMap;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.SimpleCondition;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
* This class manages the streaming of multiple files one after the other.
*/
public class StreamOutSession
{
- private static Logger logger = LoggerFactory.getLogger(
StreamOutSession.class );
+ private static final Logger logger = LoggerFactory.getLogger(
StreamOutSession.class );
- // one host may have multiple stream contexts (think of them as sessions).
each context gets its own manager.
- private static ConcurrentMap<StreamContext, StreamOutSession>
streamManagers = new ConcurrentHashMap<StreamContext, StreamOutSession>();
- public static final Multimap<InetAddress, StreamContext> destHosts =
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress,
StreamContext>create());
-
- public static StreamOutSession get(StreamContext context)
- {
- StreamOutSession session = streamManagers.get(context);
- if (session == null)
- {
- StreamOutSession possibleNew = new StreamOutSession(context);
- if ((session = streamManagers.putIfAbsent(context, possibleNew))
== null)
- {
- session = possibleNew;
- destHosts.put(context.host, context);
- }
- }
- return session;
- }
-
- public static void remove(StreamContext context)
+ // one host may have multiple stream sessions.
+ private static final ConcurrentMap<StreamContext, StreamOutSession>
streams = new NonBlockingHashMap<StreamContext, StreamOutSession>();
+
+ public static StreamOutSession create(InetAddress host)
{
- if (streamManagers.containsKey(context) &&
streamManagers.get(context).files.size() == 0)
- {
- streamManagers.remove(context);
- destHosts.remove(context.host, context);
- }
+ return create(host, System.nanoTime());
}
- public static Set<InetAddress> getDestinations()
+ public static StreamOutSession create(InetAddress host, long sessionId)
{
- // the results of streamManagers.keySet() isn't serializable, so
create a new set.
- Set<InetAddress> hosts = new HashSet<InetAddress>();
- hosts.addAll(destHosts.keySet());
- return hosts;
+ StreamContext context = new StreamContext(host, sessionId);
+ StreamOutSession session = new StreamOutSession(context);
+ streams.put(context, session);
+ return session;
}
-
- /**
- * this method exists so that we don't have to call StreamOutManager.get()
which has a nasty side-effect of
- * indicating that we are streaming to a particular host.
- **/
- public static List<PendingFile> getPendingFiles(StreamContext context)
+
+ public static StreamOutSession get(InetAddress host, long sessionId)
{
- List<PendingFile> list = new ArrayList<PendingFile>();
- StreamOutSession session = streamManagers.get(context);
- if (session != null)
- list.addAll(session.getFiles());
- return list;
+ return streams.get(new StreamContext(host, sessionId));
}
- public static List<PendingFile> getOutgoingFiles(InetAddress host)
+ public void close()
{
- List<PendingFile> list = new ArrayList<PendingFile>();
- for(StreamContext context : destHosts.get(host))
- {
- list.addAll(getPendingFiles(context));
- }
- return list;
+ streams.remove(context);
}
// we need sequential and random access to the files. hence, the map and
the list.
@@ -117,6 +74,16 @@ public class StreamOutSession
{
this.context = context;
}
+
+ public InetAddress getHost()
+ {
+ return context.host;
+ }
+
+ public long getSessionId()
+ {
+ return context.sessionId;
+ }
public void addFilesToStream(List<PendingFile> pendingFiles)
{
@@ -142,7 +109,7 @@ public class StreamOutSession
{
if (logger.isDebugEnabled())
logger.debug("Streaming {} ...", pf);
- MessagingService.instance.stream(new StreamHeader(context.sessionId,
pf, true), context.host);
+ MessagingService.instance.stream(new StreamHeader(getSessionId(), pf,
true), getHost());
}
public void finishAndStartNext(String pfname) throws IOException
@@ -150,23 +117,25 @@ public class StreamOutSession
PendingFile pf = fileMap.remove(pfname);
files.remove(pf);
- if (files.size() > 0)
- streamFile(files.get(0));
- else
+ if (files.isEmpty())
{
if (logger.isDebugEnabled())
- logger.debug("Signalling that streaming is done for {} session
{}", context.host, context.sessionId);
- remove(context);
+ logger.debug("Signalling that streaming is done for {} session
{}", getHost(), getSessionId());
+ close();
condition.signalAll();
}
+ else
+ {
+ streamFile(files.get(0));
+ }
}
public void removePending(PendingFile pf)
{
files.remove(pf);
fileMap.remove(pf.getFilename());
- if (files.size() == 0)
- remove(context);
+ if (files.isEmpty())
+ close();
}
public void waitForStreamCompletion()
@@ -185,4 +154,25 @@ public class StreamOutSession
{
return Collections.unmodifiableList(files);
}
+
+ public static Set<InetAddress> getDestinations()
+ {
+ Set<InetAddress> hosts = new HashSet<InetAddress>();
+ for (StreamOutSession session : streams.values())
+ {
+ hosts.add(session.getHost());
+ }
+ return hosts;
+ }
+
+ public static List<PendingFile> getOutgoingFiles(InetAddress host)
+ {
+ List<PendingFile> list = new ArrayList<PendingFile>();
+ for (Map.Entry<StreamContext, StreamOutSession> entry :
streams.entrySet())
+ {
+ if (entry.getKey().host.equals(host))
+ list.addAll(entry.getValue().getFiles());
+ }
+ return list;
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
Tue Sep 14 22:25:53 2010
@@ -54,15 +54,16 @@ public class StreamRequestVerbHandler im
if (srm.file != null)
{
- // single file request.
+ // single file re-request.
StreamHeader header = new StreamHeader(srm.sessionId,
srm.file, false);
MessagingService.instance.stream(header, message.getFrom());
- StreamOutSession.get(new StreamContext(message.getFrom(),
srm.sessionId)).removePending(srm.file);
+ StreamOutSession.get(message.getFrom(),
srm.sessionId).removePending(srm.file);
}
else
{
// range request.
- StreamOut.transferRangesForRequest(new
StreamContext(message.getFrom(), srm.sessionId), srm.table, srm.ranges, null);
+ StreamOutSession session =
StreamOutSession.create(message.getFrom(), srm.sessionId);
+ StreamOut.transferRangesForRequest(session, srm.table,
srm.ranges, null);
}
}
catch (IOException ex)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
Tue Sep 14 22:25:53 2010
@@ -45,20 +45,20 @@ public class StreamStatusVerbHandler imp
try
{
FileStatus streamStatus = FileStatus.serializer().deserialize(new
DataInputStream(bufIn));
- StreamContext context = new StreamContext(message.getFrom(),
streamStatus.getSessionId());
+ StreamOutSession session = StreamOutSession.get(message.getFrom(),
streamStatus.getSessionId());
switch (streamStatus.getAction())
{
case DELETE:
-
StreamOutSession.get(context).finishAndStartNext(streamStatus.getFile());
+ session.finishAndStartNext(streamStatus.getFile());
break;
case STREAM:
logger.warn("Need to re-stream file {} to {}",
streamStatus.getFile(), message.getFrom());
-
StreamOutSession.get(context).retry(streamStatus.getFile());
+ session.retry(streamStatus.getFile());
break;
case EMPTY:
logger.error("Did not find matching ranges on {}",
message.getFrom());
- StreamInSession.get(context).remove();
+ StreamInSession.get(message.getFrom(),
streamStatus.getSessionId()).remove();
if (StorageService.instance.isBootstrapMode())
StorageService.instance.removeBootstrapSource(message.getFrom(), new
String(message.getHeader(StreamOut.TABLE_NAME)));
break;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
Tue Sep 14 22:25:53 2010
@@ -54,10 +54,10 @@ public class StreamingService implements
{
StringBuilder sb = new StringBuilder();
sb.append("Receiving from:\n");
- for (StreamContext source : StreamInSession.getSources())
+ for (InetAddress source : StreamInSession.getSources())
{
- sb.append(String.format(" %s:\n", source.host.getHostAddress()));
- for (PendingFile pf :
StreamInSession.getIncomingFiles(source.host))
+ sb.append(String.format(" %s:\n", source.getHostAddress()));
+ for (PendingFile pf : StreamInSession.getIncomingFiles(source))
{
sb.append(String.format(" %s\n", pf.toString()));
}
@@ -99,13 +99,7 @@ public class StreamingService implements
/** hosts sending incoming streams */
public Set<InetAddress> getStreamSources()
{
- Set<InetAddress> sources = new HashSet<InetAddress>();
-
- for(StreamContext context : StreamInSession.getSources())
- {
- sources.add(context.host);
- }
- return sources;
+ return StreamInSession.getSources();
}
/** details about incoming streams. */
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=997119&r1=997118&r2=997119&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Tue Sep 14 22:25:53 2010
@@ -34,8 +34,6 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.SSTableUtils;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamContext;
-import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.BeforeClass;
@@ -68,7 +66,7 @@ public class StreamingTransferTest exten
List<Range> ranges = new ArrayList<Range>();
ranges.add(new Range(p.getMinimumToken(),
p.getToken("key".getBytes())));
ranges.add(new Range(p.getToken("key2".getBytes()),
p.getMinimumToken()));
- StreamOut.transferSSTables(new StreamContext(LOCAL), tablename,
Arrays.asList(sstable), ranges);
+ StreamOut.transferSSTables(StreamOutSession.create(LOCAL), tablename,
Arrays.asList(sstable), ranges);
// confirm that the SSTable was transferred and registered
ColumnFamilyStore cfstore =
Table.open(tablename).getColumnFamilyStore(cfname);
@@ -108,7 +106,7 @@ public class StreamingTransferTest exten
List<Range> ranges = new ArrayList<Range>();
ranges.add(new Range(p.getMinimumToken(),
p.getToken("transfer1".getBytes())));
ranges.add(new Range(p.getToken("test2".getBytes()),
p.getMinimumToken()));
- StreamOut.transferSSTables(new StreamContext(LOCAL), tablename,
Arrays.asList(sstable, sstable2), ranges);
+ StreamOut.transferSSTables(StreamOutSession.create(LOCAL), tablename,
Arrays.asList(sstable, sstable2), ranges);
// confirm that the SSTable was transferred and registered
ColumnFamilyStore cfstore =
Table.open(tablename).getColumnFamilyStore(cfname);