Author: gdusbabek
Date: Tue May 11 17:59:21 2010
New Revision: 943209
URL: http://svn.apache.org/viewvc?rev=943209&view=rev
Log:
indicate stream source/destinations before anticompaction. patch by gdusbabek,
reviewed by jbellis. CASSANDRA-956
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamIn.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=943209&r1=943208&r2=943209&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamIn.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamIn.java
Tue May 11 17:59:21 2010
@@ -44,6 +44,7 @@ public class StreamIn
{
if (logger.isDebugEnabled())
logger.debug("Requesting from " + source + " ranges " +
StringUtils.join(ranges, ", "));
+ StreamInManager.waitingForAnticompaction.put(source, tableName);
StreamRequestMetadata streamRequestMetadata = new
StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges, tableName);
Message message = StreamRequestMessage.makeStreamRequestMessage(new
StreamRequestMessage(streamRequestMetadata));
MessagingService.instance.sendOneWay(message, source);
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=943209&r1=943208&r2=943209&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInManager.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInManager.java
Tue May 11 17:59:21 2010
@@ -40,6 +40,9 @@ class StreamInManager
public static final Map<InetAddress, IStreamComplete>
streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
public static final Multimap<InetAddress, PendingFile> activeStreams =
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, PendingFile>create());
+
+ /** keep track of which hosts this node has sent requests for ranges to. */
+ public static final Multimap<InetAddress, String> waitingForAnticompaction
= Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, String>create());
public synchronized static PendingFile getStreamContext(InetAddress key)
{
@@ -69,6 +72,7 @@ class StreamInManager
HashSet<InetAddress> set = new HashSet<InetAddress>();
set.addAll(ctxBag_.keySet());
set.addAll(activeStreams.keySet());
+ set.addAll(waitingForAnticompaction.keySet());
return set;
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=943209&r1=943208&r2=943209&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
Tue May 11 17:59:21 2010
@@ -70,6 +70,7 @@ public class StreamInitiateVerbHandler i
logger.debug("no data needed from " + message.getFrom());
if (StorageService.instance.isBootstrapMode())
StorageService.instance.removeBootstrapSource(message.getFrom(), new
String(message.getHeader(StreamOut.TABLE_NAME)));
+
StreamInManager.waitingForAnticompaction.removeAll(message.getFrom());
return;
}
@@ -91,6 +92,7 @@ public class StreamInitiateVerbHandler i
logger.debug("Received Data from : " + message.getFrom() +
" " + pendingFile.getTargetFile() + " " + file);
pendingFile.setTargetFile(file);
addStreamContext(message.getFrom(), pendingFile, streamStatus);
+
StreamInManager.waitingForAnticompaction.remove(message.getFrom(),
pendingFile.getTable());
}
StreamInManager.registerStreamCompletionHandler(message.getFrom(),
new StreamCompletionHandler());
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=943209&r1=943208&r2=943209&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Tue May 11 17:59:21 2010
@@ -72,6 +72,11 @@ public class StreamOut
public static void transferRanges(InetAddress target, String tableName,
Collection<Range> ranges, Runnable callback)
{
assert ranges.size() > 0;
+
+ // this is a sneaky way of indicating target as a destination node. it
is a lame way of doing it and will
+ // change as part of fixing CASSANDRA-1076.
+ StreamOutManager.get(target);
+
logger.debug("Beginning transfer process to " + target + " for ranges
" + StringUtils.join(ranges, ", "));