Author: jbellis
Date: Tue Sep 14 22:25:31 2010
New Revision: 997118
URL: http://svn.apache.org/viewvc?rev=997118&view=rev
Log:
rename Manager -> Session
patch by jbellis for CASSANDRA-1504
Added:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
- copied, changed from r997082,
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
- copied, changed from r997082,
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Tue Sep 14 22:25:31 2010
@@ -48,7 +48,7 @@ import org.apache.cassandra.net.CompactE
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamOutManager;
+import org.apache.cassandra.streaming.StreamOutSession;
import org.apache.cassandra.utils.*;
/**
@@ -540,7 +540,7 @@ public class AntiEntropyService
{
StreamContext context = new
StreamContext(request.endpoint);
StreamOut.transferSSTables(context, request.cf.left,
sstables, ranges);
- StreamOutManager.remove(context);
+ StreamOutSession.remove(context);
}
});
// request ranges from the remote node
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Tue Sep 14 22:25:31 2010
@@ -50,14 +50,14 @@ public class IncomingStreamReader
lastFile = header.getStreamFile();
initiatedTransfer = header.initiatedTransfer;
context = new StreamContext(remoteAddress.getAddress(),
header.getSessionId());
- StreamInManager.activeStreams.put(context, pendingFile);
+ StreamInSession.activeStreams.put(context, pendingFile);
assert pendingFile != null;
// For transfers setup the status and for replies to requests, prepare
the list
// of available files to request.
if (initiatedTransfer)
streamStatus = new FileStatus(lastFile.getFilename(),
header.getSessionId());
else if (header.getPendingFiles() != null)
-
StreamInManager.get(context).addFilesToRequest(header.getPendingFiles());
+
StreamInSession.get(context).addFilesToRequest(header.getPendingFiles());
}
public void read() throws IOException
@@ -98,7 +98,7 @@ public class IncomingStreamReader
finally
{
fc.close();
- StreamInManager.activeStreams.remove(context, pendingFile);
+ StreamInSession.activeStreams.remove(context, pendingFile);
}
if (logger.isDebugEnabled())
@@ -108,7 +108,7 @@ public class IncomingStreamReader
else
{
FileStatusHandler.addSSTable(pendingFile);
- StreamInManager.get(context).finishAndRequestNext(lastFile);
+ StreamInSession.get(context).finishAndRequestNext(lastFile);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Tue
Sep 14 22:25:31 2010
@@ -52,7 +52,7 @@ public class StreamIn
if (logger.isDebugEnabled())
logger.debug("Requesting from {} ranges {}", source,
StringUtils.join(ranges, ", "));
StreamContext context = new StreamContext(source);
- StreamInManager.get(context);
+ StreamInSession.get(context);
Message message = new
StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName,
context.sessionId).makeMessage();
MessagingService.instance.sendOneWay(message, source);
}
Copied:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
(from r997082,
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java)
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java&r1=997082&r2=997118&rev=997118&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
Tue Sep 14 22:25:31 2010
@@ -32,36 +32,36 @@ import org.apache.cassandra.service.Stor
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** each context gets its own StreamInManager. So there may be >1
StreamInManager per host */
-public class StreamInManager
+/** each context gets its own StreamInSession. So there may be >1
StreamInManager per host */
+public class StreamInSession
{
- private static final Logger logger =
LoggerFactory.getLogger(StreamInManager.class);
+ private static final Logger logger =
LoggerFactory.getLogger(StreamInSession.class);
- private static ConcurrentMap<StreamContext, StreamInManager>
streamManagers = new ConcurrentHashMap<StreamContext, StreamInManager>(0);
+ private static ConcurrentMap<StreamContext, StreamInSession>
streamManagers = new ConcurrentHashMap<StreamContext, StreamInSession>(0);
public static final Multimap<StreamContext, PendingFile> activeStreams =
Multimaps.synchronizedMultimap(HashMultimap.<StreamContext,
PendingFile>create());
public static final Multimap<InetAddress, StreamContext> sourceHosts =
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress,
StreamContext>create());
private final List<PendingFile> pendingFiles = new
ArrayList<PendingFile>();
private final StreamContext context;
- private StreamInManager(StreamContext context)
+ private StreamInSession(StreamContext context)
{
this.context = context;
}
- public synchronized static StreamInManager get(StreamContext context)
+ public synchronized static StreamInSession get(StreamContext context)
{
- StreamInManager manager = streamManagers.get(context);
- if (manager == null)
+ StreamInSession session = streamManagers.get(context);
+ if (session == null)
{
- StreamInManager possibleNew = new StreamInManager(context);
- if ((manager = streamManagers.putIfAbsent(context, possibleNew))
== null)
+ StreamInSession possibleNew = new StreamInSession(context);
+ if ((session = streamManagers.putIfAbsent(context, possibleNew))
== null)
{
- manager = possibleNew;
+ session = possibleNew;
sourceHosts.put(context.host, context);
}
}
- return manager;
+ return session;
}
public void addFilesToRequest(List<PendingFile> pendingFiles)
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Tue
Sep 14 22:25:31 2010
@@ -68,7 +68,7 @@ public class StreamOut
StreamContext context = new StreamContext(target);
// this is so that this target shows up as a destination while
anticompaction is happening.
- StreamOutManager.get(context);
+ StreamOutSession.get(context);
logger.info("Beginning transfer process to {} for ranges {}", context,
StringUtils.join(ranges, ", "));
@@ -84,7 +84,7 @@ public class StreamOut
}
finally
{
- StreamOutManager.remove(context);
+ StreamOutSession.remove(context);
}
if (callback != null)
callback.run();
@@ -151,13 +151,13 @@ public class StreamOut
if (pending.size() > 0)
{
StreamHeader header = new StreamHeader(context.sessionId,
pending.get(0), true);
- StreamOutManager.get(context).addFilesToStream(pending);
+ StreamOutSession.get(context).addFilesToStream(pending);
logger.info("Streaming file {} to {}", header.getStreamFile(),
context.host);
MessagingService.instance.stream(header, context.host);
logger.info("Waiting for transfer to {} to complete", context);
- StreamOutManager.get(context).waitForStreamCompletion();
+ StreamOutSession.get(context).waitForStreamCompletion();
logger.info("Done with transfer to {}", context);
}
}
@@ -173,12 +173,12 @@ public class StreamOut
{
StreamHeader header = new StreamHeader(context.sessionId,
pending.get(0), pending, false);
// In case this happens to be a re-request due to some error
condition on the destination side
- if (StreamOutManager.getPendingFiles(context).size() == 0)
- StreamOutManager.get(context).addFilesToStream(pending);
+ if (StreamOutSession.getPendingFiles(context).size() == 0)
+ StreamOutSession.get(context).addFilesToStream(pending);
logger.info("Streaming file {} to {}", header.getStreamFile(),
context.host);
MessagingService.instance.stream(header, context.host);
-
StreamOutManager.get(context).removePending(header.getStreamFile());
+
StreamOutSession.get(context).removePending(header.getStreamFile());
}
else
{
Copied:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
(from r997082,
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java)
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java&r1=997082&r2=997118&rev=997118&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Tue Sep 14 22:25:31 2010
@@ -43,27 +43,27 @@ import com.google.common.collect.Multima
/**
* This class manages the streaming of multiple files one after the other.
*/
-public class StreamOutManager
+public class StreamOutSession
{
- private static Logger logger = LoggerFactory.getLogger(
StreamOutManager.class );
+ private static Logger logger = LoggerFactory.getLogger(
StreamOutSession.class );
// one host may have multiple stream contexts (think of them as sessions).
each context gets its own manager.
- private static ConcurrentMap<StreamContext, StreamOutManager>
streamManagers = new ConcurrentHashMap<StreamContext, StreamOutManager>();
+ private static ConcurrentMap<StreamContext, StreamOutSession>
streamManagers = new ConcurrentHashMap<StreamContext, StreamOutSession>();
public static final Multimap<InetAddress, StreamContext> destHosts =
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress,
StreamContext>create());
- public static StreamOutManager get(StreamContext context)
+ public static StreamOutSession get(StreamContext context)
{
- StreamOutManager manager = streamManagers.get(context);
- if (manager == null)
+ StreamOutSession session = streamManagers.get(context);
+ if (session == null)
{
- StreamOutManager possibleNew = new StreamOutManager(context);
- if ((manager = streamManagers.putIfAbsent(context, possibleNew))
== null)
+ StreamOutSession possibleNew = new StreamOutSession(context);
+ if ((session = streamManagers.putIfAbsent(context, possibleNew))
== null)
{
- manager = possibleNew;
+ session = possibleNew;
destHosts.put(context.host, context);
}
}
- return manager;
+ return session;
}
public static void remove(StreamContext context)
@@ -90,9 +90,9 @@ public class StreamOutManager
public static List<PendingFile> getPendingFiles(StreamContext context)
{
List<PendingFile> list = new ArrayList<PendingFile>();
- StreamOutManager manager = streamManagers.get(context);
- if (manager != null)
- list.addAll(manager.getFiles());
+ StreamOutSession session = streamManagers.get(context);
+ if (session != null)
+ list.addAll(session.getFiles());
return list;
}
@@ -113,7 +113,7 @@ public class StreamOutManager
private final StreamContext context;
private final SimpleCondition condition = new SimpleCondition();
- private StreamOutManager(StreamContext context)
+ private StreamOutSession(StreamContext context)
{
this.context = context;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
Tue Sep 14 22:25:31 2010
@@ -57,7 +57,7 @@ public class StreamRequestVerbHandler im
// single file request.
StreamHeader header = new StreamHeader(srm.sessionId,
srm.file, false);
MessagingService.instance.stream(header, message.getFrom());
- StreamOutManager.get(new StreamContext(message.getFrom(),
srm.sessionId)).removePending(srm.file);
+ StreamOutSession.get(new StreamContext(message.getFrom(),
srm.sessionId)).removePending(srm.file);
}
else
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
Tue Sep 14 22:25:31 2010
@@ -50,15 +50,15 @@ public class StreamStatusVerbHandler imp
switch (streamStatus.getAction())
{
case DELETE:
-
StreamOutManager.get(context).finishAndStartNext(streamStatus.getFile());
+
StreamOutSession.get(context).finishAndStartNext(streamStatus.getFile());
break;
case STREAM:
logger.warn("Need to re-stream file {} to {}",
streamStatus.getFile(), message.getFrom());
-
StreamOutManager.get(context).retry(streamStatus.getFile());
+
StreamOutSession.get(context).retry(streamStatus.getFile());
break;
case EMPTY:
logger.error("Did not find matching ranges on {}",
message.getFrom());
- StreamInManager.get(context).remove();
+ StreamInSession.get(context).remove();
if (StorageService.instance.isBootstrapMode())
StorageService.instance.removeBootstrapSource(message.getFrom(), new
String(message.getHeader(StreamOut.TABLE_NAME)));
break;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=997118&r1=997117&r2=997118&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
Tue Sep 14 22:25:31 2010
@@ -54,19 +54,19 @@ public class StreamingService implements
{
StringBuilder sb = new StringBuilder();
sb.append("Receiving from:\n");
- for (StreamContext source : StreamInManager.getSources())
+ for (StreamContext source : StreamInSession.getSources())
{
sb.append(String.format(" %s:\n", source.host.getHostAddress()));
- for (PendingFile pf :
StreamInManager.getIncomingFiles(source.host))
+ for (PendingFile pf :
StreamInSession.getIncomingFiles(source.host))
{
sb.append(String.format(" %s\n", pf.toString()));
}
}
sb.append("Sending to:\n");
- for (InetAddress dest : StreamOutManager.getDestinations())
+ for (InetAddress dest : StreamOutSession.getDestinations())
{
sb.append(String.format(" %s:\n", dest.getHostAddress()));
- for (PendingFile pf : StreamOutManager.getOutgoingFiles(dest))
+ for (PendingFile pf : StreamOutSession.getOutgoingFiles(dest))
{
sb.append(String.format(" %s\n", pf.toString()));
}
@@ -77,7 +77,7 @@ public class StreamingService implements
/** hosts receiving outgoing streams. */
public Set<InetAddress> getStreamDestinations()
{
- return StreamOutManager.getDestinations();
+ return StreamOutSession.getDestinations();
}
/** outgoing streams */
@@ -91,7 +91,7 @@ public class StreamingService implements
if (!existingDestinations.contains(dest))
return files;
- for (PendingFile f : StreamOutManager.getOutgoingFiles(dest))
+ for (PendingFile f : StreamOutSession.getOutgoingFiles(dest))
files.add(String.format("%s", f.toString()));
return files;
}
@@ -101,7 +101,7 @@ public class StreamingService implements
{
Set<InetAddress> sources = new HashSet<InetAddress>();
- for(StreamContext context : StreamInManager.getSources())
+ for(StreamContext context : StreamInSession.getSources())
{
sources.add(context.host);
}
@@ -112,7 +112,7 @@ public class StreamingService implements
public List<String> getIncomingFiles(String host) throws IOException
{
List<String> files = new ArrayList<String>();
- for (PendingFile pf :
StreamInManager.getIncomingFiles(InetAddress.getByName(host)))
+ for (PendingFile pf :
StreamInSession.getIncomingFiles(InetAddress.getByName(host)))
{
files.add(String.format("%s: %s", pf.desc.ksname, pf.toString()));
}