Author: gdusbabek
Date: Mon Jun 14 18:37:22 2010
New Revision: 954581

URL: http://svn.apache.org/viewvc?rev=954581&view=rev
Log:
remove dead SOM after AES streams, reset the wait condition for when new files 
are added. Patch by gdusbabek, reviewed by jbellis. CASSANDRA-1169

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOutManager.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/SimpleCondition.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=954581&r1=954580&r2=954581&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Jun 14 18:37:22 2010
@@ -22,6 +22,7 @@
  * detect incomplete commit log hearders (CASSANDRA-1119)
  * force anti-entropy service to stream files on the stream stage to avoid
    sending streams out of order (CASSANDRA-1169)
+ * remove inactive stream managers after AES streams files (CASSANDRA-1169)
 
 
 0.6.2

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=954581&r1=954580&r2=954581&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Mon Jun 14 18:37:22 2010
@@ -39,6 +39,7 @@ import org.apache.cassandra.streaming.St
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamOutManager;
 import org.apache.cassandra.utils.*;
 
 import org.apache.log4j.Logger;
@@ -629,6 +630,7 @@ public class AntiEntropyService
                     protected void runMayThrow() throws Exception
                     {
                         StreamOut.transferSSTables(remote, sstables, cf.left);
+                        StreamOutManager.remove(remote);
                     }
                 });
                 f.get();

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=954581&r1=954580&r2=954581&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOutManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOutManager.java
 Mon Jun 14 18:37:22 2010
@@ -106,6 +106,8 @@ public class StreamOutManager
     
     public void addFilesToStream(PendingFile[] pendingFiles)
     {
+        // reset the condition in case this SOM is getting reused before it 
can be removed.
+        condition.reset();
         for (PendingFile pendingFile : pendingFiles)
         {
             if (logger.isDebugEnabled())

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/SimpleCondition.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/SimpleCondition.java?rev=954581&r1=954580&r2=954581&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/SimpleCondition.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/SimpleCondition.java
 Mon Jun 14 18:37:22 2010
@@ -37,6 +37,11 @@ public class SimpleCondition implements 
         while (!set)
             wait();
     }
+    
+    public synchronized void reset()
+    {
+        set = false;
+    }
 
     public synchronized boolean await(long time, TimeUnit unit) throws 
InterruptedException
     {


Reply via email to