Author: gdusbabek
Date: Tue May 11 16:59:55 2010
New Revision: 943175

URL: http://svn.apache.org/viewvc?rev=943175&view=rev
Log:
indicate stream sources and destinations before anticompaction. patch by 
gdusbabek, reviewed by jbellis. CASSANDRA-956

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=943175&r1=943174&r2=943175&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Tue 
May 11 16:59:55 2010
@@ -45,6 +45,7 @@ public class StreamIn
     {
         if (logger.isDebugEnabled())
             logger.debug("Requesting from " + source + " ranges " + 
StringUtils.join(ranges, ", "));
+        StreamInManager.waitingForAnticompaction.put(source, tableName);
         StreamRequestMetadata streamRequestMetadata = new 
StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges, tableName);
         Message message = StreamRequestMessage.makeStreamRequestMessage(new 
StreamRequestMessage(streamRequestMetadata));
         MessagingService.instance.sendOneWay(message, source);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=943175&r1=943174&r2=943175&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java 
Tue May 11 16:59:55 2010
@@ -41,6 +41,9 @@ class StreamInManager
     public static final Map<InetAddress, IStreamComplete> 
streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
 
     public static final Multimap<InetAddress, PendingFile> activeStreams = 
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, PendingFile>create());
+    
+    /** keep track of which hosts this node has sent requests for ranges to. */
+    public static final Multimap<InetAddress, String> waitingForAnticompaction 
= Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, String>create());
 
     public synchronized static PendingFile getStreamContext(InetAddress key)
     {        
@@ -70,6 +73,7 @@ class StreamInManager
         HashSet<InetAddress> set = new HashSet<InetAddress>();
         set.addAll(ctxBag_.keySet());
         set.addAll(activeStreams.keySet());
+        set.addAll(waitingForAnticompaction.keySet());
         return set;
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=943175&r1=943174&r2=943175&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 Tue May 11 16:59:55 2010
@@ -67,6 +67,7 @@ public class StreamInitiateVerbHandler i
                     logger.debug("no data needed from " + message.getFrom());
                 if (StorageService.instance.isBootstrapMode())
                     
StorageService.instance.removeBootstrapSource(message.getFrom(), new 
String(message.getHeader(StreamOut.TABLE_NAME)));
+                
StreamInManager.waitingForAnticompaction.removeAll(message.getFrom());
                 return;
             }
 
@@ -85,6 +86,7 @@ public class StreamInitiateVerbHandler i
                 if (logger.isDebugEnabled())
                   logger.debug("Preparing to receive stream from " + 
message.getFrom() + ": " + remoteFile + " -> " + localFile);
                 addStreamContext(message.getFrom(), localFile, streamStatus);
+                
StreamInManager.waitingForAnticompaction.remove(message.getFrom(), 
remoteFile.getDescriptor().ksname);
             }
 
             StreamInManager.registerStreamCompletionHandler(message.getFrom(), 
new StreamCompletionHandler());

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=943175&r1=943174&r2=943175&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Tue 
May 11 16:59:55 2010
@@ -71,6 +71,10 @@ public class StreamOut
     public static void transferRanges(InetAddress target, String tableName, 
Collection<Range> ranges, Runnable callback)
     {
         assert ranges.size() > 0;
+        
+        // this is a sneaking way of indicating target as a destination node. 
it is a lame way of doing it and will 
+        // change as part of fixing CASSANDRA-1076.
+        StreamOutManager.get(target);
 
         logger.debug("Beginning transfer process to " + target + " for ranges 
" + StringUtils.join(ranges, ", "));
 



Reply via email to