Author: jbellis
Date: Sun Jan 31 00:21:36 2010
New Revision: 904924

URL: http://svn.apache.org/viewvc?rev=904924&view=rev
Log:
centralize streaming code in org.apache.cassandra.streaming; split out verbs & 
handlers into top-level classes
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
   (contents, props changed)
      - copied, changed from r904698, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
   (with props)
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
   (contents, props changed)
      - copied, changed from r904698, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
   (with props)
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
   (with props)
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
   (with props)
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
   (contents, props changed)
      - copied, changed from r904698, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
   (with props)
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
   (with props)
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
   (contents, props changed)
      - copied, changed from r904698, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
   (with props)
Removed:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java 
Sun Jan 31 00:21:36 2010
@@ -27,18 +27,17 @@
  import org.apache.log4j.Logger;
 
  import org.apache.commons.lang.ArrayUtils;
- import org.apache.commons.lang.StringUtils;
 
  import org.apache.cassandra.locator.TokenMetadata;
  import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.net.*;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.Streaming;
  import org.apache.cassandra.utils.SimpleCondition;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.gms.FailureDetector;
  import org.apache.cassandra.gms.IFailureDetector;
- import org.apache.cassandra.io.Streaming;
  import com.google.common.collect.Multimap;
  import com.google.common.collect.ArrayListMultimap;
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
 Sun Jan 31 00:21:36 2010
@@ -25,7 +25,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.streaming.StreamContextManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java 
Sun Jan 31 00:21:36 2010
@@ -28,7 +28,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.streaming.Streaming;
 
 import org.apache.log4j.Logger;
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
 Sun Jan 31 00:21:36 2010
@@ -27,6 +27,8 @@
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.net.FileStreamTask;
+import org.apache.cassandra.streaming.IStreamComplete;
+import org.apache.cassandra.streaming.StreamContextManager;
 
 public class IncomingStreamReader
 {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Sun Jan 31 00:21:36 2010
@@ -35,7 +35,7 @@
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.streaming.Streaming;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
 Sun Jan 31 00:21:36 2010
@@ -35,7 +35,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.streaming.Streaming;
 
 /*
  * The load balancing algorithm here is an implementation of

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 Sun Jan 31 00:21:36 2010
@@ -37,13 +37,12 @@
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
+import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.Streaming;
-import org.apache.cassandra.io.StreamRequestVerbHandler;
 import org.apache.cassandra.io.util.FileUtils;
 
 import org.apache.log4j.Logger;
@@ -210,9 +209,9 @@
         // see BootStrapper for a summary of how the bootstrap verbs interact
         MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, 
new BootStrapper.BootstrapTokenVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST, 
new StreamRequestVerbHandler() );
-        MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE, 
new Streaming.StreamInitiateVerbHandler());
-        
MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE_DONE, new 
Streaming.StreamInitiateDoneVerbHandler());
-        MessagingService.instance.registerVerbHandlers(Verb.STREAM_FINISHED, 
new Streaming.StreamFinishedVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE, 
new StreamInitiateVerbHandler());
+        
MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE_DONE, new 
StreamInitiateDoneVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.STREAM_FINISHED, 
new StreamFinishedVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new 
ResponseVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new 
TreeRequestVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new 
AntiEntropyService.TreeResponseVerbHandler());

Copied: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
 (from r904698, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java)
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java&r1=904698&r2=904924&rev=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
 Sun Jan 31 00:21:36 2010
@@ -16,12 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.net.io;
+package org.apache.cassandra.streaming;
 
 import java.io.IOException;
 
 import java.net.InetAddress;
 
+import org.apache.cassandra.streaming.StreamContextManager;
+
 public interface IStreamComplete
 {
     /*

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904924&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
 Sun Jan 31 00:21:36 2010
@@ -0,0 +1,65 @@
+package org.apache.cassandra.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.IStreamComplete;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * This is the callback handler that is invoked when we have
+ * completely received a single file from a remote host.
+ *
+ * TODO if we move this into CFS we could make addSSTables private, improving 
encapsulation.
+*/
+class StreamCompletionHandler implements IStreamComplete
+{
+    private static Logger logger = 
Logger.getLogger(StreamCompletionHandler.class);
+
+    public void onStreamCompletion(InetAddress host, 
StreamContextManager.StreamContext streamContext, 
StreamContextManager.StreamStatus streamStatus) throws IOException
+    {
+        /* Parse the stream context and the file to the list of SSTables in 
the associated Column Family Store. */
+        if (streamContext.getTargetFile().contains("-Data.db"))
+        {
+            String tableName = streamContext.getTable();
+            File file = new File( streamContext.getTargetFile() );
+            String fileName = file.getName();
+            String [] temp = fileName.split("-");
+
+            //Open the file to see if all parts are now here
+            try
+            {
+                SSTableReader sstable = 
SSTableWriter.renameAndOpen(streamContext.getTargetFile());
+                //TODO add a sanity check that this sstable has all its parts 
and is ok
+                
Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
+                logger.info("Streaming added " + sstable.getFilename());
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException("Not able to add streamed file " + 
streamContext.getTargetFile(), e);
+            }
+        }
+
+        if (logger.isDebugEnabled())
+          logger.debug("Sending a streaming finished message with " + 
streamStatus + " to " + host);
+        /* Send a StreamStatusMessage object which may require the source node 
to re-stream certain files. */
+        StreamContextManager.StreamStatusMessage streamStatusMessage = new 
StreamContextManager.StreamStatusMessage(streamStatus);
+        Message message = 
StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+        MessagingService.instance.sendOneWay(message, host);
+
+        /* If we're done with everything for this host, remove from bootstrap 
sources */
+        if (StreamContextManager.isDone(host) && 
StorageService.instance.isBootstrapMode())
+        {
+            StorageService.instance.removeBootstrapSource(host, 
streamContext.getTable());
+        }
+    }
+}

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
 (from r904698, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java)
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java&r1=904698&r2=904924&rev=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
 Sun Jan 31 00:21:36 2010
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.net.io;
+package org.apache.cassandra.streaming;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -27,6 +27,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.IStreamComplete;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -34,7 +35,7 @@
 
 public class StreamContextManager
 {
-    private static Logger logger_ = 
Logger.getLogger(StreamContextManager.class);
+    private static Logger logger = 
Logger.getLogger(StreamContextManager.class);
     
     public static enum StreamCompletionAction
     {
@@ -160,7 +161,7 @@
             return expectedBytes_;
         }
         
-        void setAction(StreamContextManager.StreamCompletionAction action)
+        public void setAction(StreamContextManager.StreamCompletionAction 
action)
         {
             action_ = action;
         }

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904924&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
 Sun Jan 31 00:21:36 2010
@@ -0,0 +1,50 @@
+package org.apache.cassandra.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamManager;
+
+public class StreamFinishedVerbHandler implements IVerbHandler
+{
+    private static Logger logger = 
Logger.getLogger(StreamFinishedVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+
+        try
+        {
+            StreamContextManager.StreamStatusMessage streamStatusMessage = 
StreamContextManager.StreamStatusMessage.serializer().deserialize(new 
DataInputStream(bufIn));
+            StreamContextManager.StreamStatus streamStatus = 
streamStatusMessage.getStreamStatus();
+
+            switch (streamStatus.getAction())
+            {
+                case DELETE:
+                    
StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
+                    break;
+
+                case STREAM:
+                    if (logger.isDebugEnabled())
+                        logger.debug("Need to re-stream file " + 
streamStatus.getFile());
+                    StreamManager.instance(message.getFrom()).repeat();
+                    break;
+
+                default:
+                    break;
+            }
+        }
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }
+    }
+}

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=904924&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
 Sun Jan 31 00:21:36 2010
@@ -0,0 +1,19 @@
+package org.apache.cassandra.streaming;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamManager;
+
+public class StreamInitiateDoneVerbHandler implements IVerbHandler
+{
+    private static Logger logger = 
Logger.getLogger(StreamInitiateDoneVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        if (logger.isDebugEnabled())
+          logger.debug("Received a stream initiate done message ...");
+        StreamManager.instance(message.getFrom()).start();
+    }
+}

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904924&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 Sun Jan 31 00:21:36 2010
@@ -0,0 +1,145 @@
+package org.apache.cassandra.streaming;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class StreamInitiateVerbHandler implements IVerbHandler
+{
+    private static Logger logger = 
Logger.getLogger(StreamInitiateVerbHandler.class);
+
+    /*
+     * Here we handle the StreamInitiateMessage. Here we get the
+     * array of StreamContexts. We get file names for the column
+     * families associated with the files and replace them with the
+     * file names as obtained from the column family store on the
+     * receiving end.
+    */
+    public void doVerb(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        if (logger.isDebugEnabled())
+            logger.debug(String.format("StreamInitiateVerbeHandler.doVerb %s 
%s %s", message.getVerb(), message.getMessageId(), message.getMessageType()));
+
+        try
+        {
+            StreamInitiateMessage biMsg = 
StreamInitiateMessage.serializer().deserialize(new DataInputStream(bufIn));
+            StreamContextManager.StreamContext[] streamContexts = 
biMsg.getStreamContext();
+
+            if (streamContexts.length == 0)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("no data needed from " + message.getFrom());
+                if (StorageService.instance.isBootstrapMode())
+                    
StorageService.instance.removeBootstrapSource(message.getFrom(), new 
String(message.getHeader(Streaming.TABLE_NAME)));
+                return;
+            }
+
+            Map<String, String> fileNames = getNewNames(streamContexts);
+            Map<String, String> pathNames = new HashMap<String, String>();
+            for (String ssName : fileNames.keySet())
+                pathNames.put(ssName, 
DatabaseDescriptor.getNextAvailableDataLocation());
+            /*
+             * For each of stream context's in the incoming message
+             * generate the new file names and store the new file names
+             * in the StreamContextManager.
+            */
+            for (StreamContextManager.StreamContext streamContext : 
streamContexts )
+            {
+                StreamContextManager.StreamStatus streamStatus = new 
StreamContextManager.StreamStatus(streamContext.getTargetFile(), 
streamContext.getExpectedBytes() );
+                String file = getNewFileNameFromOldContextAndNames(fileNames, 
pathNames, streamContext);
+
+                if (logger.isDebugEnabled())
+                  logger.debug("Received Data from  : " + message.getFrom() + 
" " + streamContext.getTargetFile() + " " + file);
+                streamContext.setTargetFile(file);
+                addStreamContext(message.getFrom(), streamContext, 
streamStatus);
+            }
+
+            
StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new 
StreamCompletionHandler());
+            if (logger.isDebugEnabled())
+              logger.debug("Sending a stream initiate done message ...");
+            Message doneMessage = new Message(FBUtilities.getLocalAddress(), 
"", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] );
+            MessagingService.instance.sendOneWay(doneMessage, 
message.getFrom());
+        }
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }
+    }
+
+    public String getNewFileNameFromOldContextAndNames(Map<String, String> 
fileNames,
+                                                       Map<String, String> 
pathNames,
+                                                       
StreamContextManager.StreamContext streamContext)
+    {
+        File sourceFile = new File( streamContext.getTargetFile() );
+        String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
+        String cfName = piece[0];
+        String ssTableNum = piece[1];
+        String typeOfFile = piece[2];
+
+        String newFileNameExpanded = fileNames.get(streamContext.getTable() + 
"-" + cfName + "-" + ssTableNum);
+        String path = pathNames.get(streamContext.getTable() + "-" + cfName + 
"-" + ssTableNum);
+        //Drop type (Data.db) from new FileName
+        String newFileName = newFileNameExpanded.replace("Data.db", 
typeOfFile);
+        return path + File.separator + streamContext.getTable() + 
File.separator + newFileName;
+    }
+
+    // todo: this method needs to be private, or package at the very least for 
easy unit testing.
+    public Map<String, String> 
getNewNames(StreamContextManager.StreamContext[] streamContexts) throws 
IOException
+    {
+        /*
+         * Mapping for each file with unique CF-i ---> new file name. For eg.
+         * for a file with name <CF>-<i>-Data.db there is a corresponding
+         * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
+         * generated file name.
+        */
+        Map<String, String> fileNames = new HashMap<String, String>();
+        /* Get the distinct entries from StreamContexts i.e have one entry per 
Data/Index/Filter file set */
+        Set<String> distinctEntries = new HashSet<String>();
+        for ( StreamContextManager.StreamContext streamContext : 
streamContexts )
+        {
+            String[] pieces = FBUtilities.strip(new 
File(streamContext.getTargetFile()).getName(), "-");
+            distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + 
"-" + pieces[1] );
+        }
+
+        /* Generate unique file names per entry */
+        for ( String distinctEntry : distinctEntries )
+        {
+            String tableName;
+            String[] pieces = FBUtilities.strip(distinctEntry, "-");
+            tableName = pieces[0];
+            Table table = Table.open( tableName );
+
+            ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
+            if (logger.isDebugEnabled())
+              logger.debug("Generating file name for " + distinctEntry + " 
...");
+            fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
+        }
+
+        return fileNames;
+    }
+
+    private void addStreamContext(InetAddress host, 
StreamContextManager.StreamContext streamContext, 
StreamContextManager.StreamStatus streamStatus)
+    {
+        if (logger.isDebugEnabled())
+          logger.debug("Adding stream context " + streamContext + " for " + 
host + " ...");
+        StreamContextManager.addStreamContext(host, streamContext, 
streamStatus);
+    }
+}

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
 (from r904698, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java)
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java&r1=904698&r2=904924&rev=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
 Sun Jan 31 00:21:36 2010
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.service;
+package org.apache.cassandra.streaming;
 
 import java.io.File;
 import java.io.IOException;
@@ -28,15 +28,14 @@
 import java.net.InetAddress;
 
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.streaming.StreamContextManager;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.util.FileUtils;
 
 import org.apache.log4j.Logger;
 
-/*
- * This class manages the streaming of multiple files 
- * one after the other. 
+/**
+ * This class manages the streaming of multiple files one after the other.
 */
 public class StreamManager
 {   

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=904924&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
 Sun Jan 31 00:21:36 2010
@@ -0,0 +1,76 @@
+package org.apache.cassandra.streaming;
+
+import java.io.*;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+* This class encapsulates the message that needs to be sent to nodes
+* that handoff data. The message contains information about ranges
+* that need to be transferred and the target node.
+*/
+class StreamRequestMessage
+{
+   private static ICompactSerializer<StreamRequestMessage> serializer_;
+   static
+   {
+       serializer_ = new StreamRequestMessageSerializer();
+   }
+
+   protected static ICompactSerializer<StreamRequestMessage> serializer()
+   {
+       return serializer_;
+   }
+
+   protected static Message makeStreamRequestMessage(StreamRequestMessage 
streamRequestMessage)
+   {
+       ByteArrayOutputStream bos = new ByteArrayOutputStream();
+       DataOutputStream dos = new DataOutputStream(bos);
+       try
+       {
+           StreamRequestMessage.serializer().serialize(streamRequestMessage, 
dos);
+       }
+       catch (IOException e)
+       {
+           throw new IOError(e);
+       }
+       return new Message(FBUtilities.getLocalAddress(), 
StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST, 
bos.toByteArray() );
+   }
+
+   protected StreamRequestMetadata[] streamRequestMetadata_ = new 
StreamRequestMetadata[0];
+
+   // TODO only actually ever need one BM, not an array
+   StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata)
+   {
+       assert streamRequestMetadata != null;
+       streamRequestMetadata_ = streamRequestMetadata;
+   }
+
+    private static class StreamRequestMessageSerializer implements 
ICompactSerializer<StreamRequestMessage>
+    {
+        public void serialize(StreamRequestMessage streamRequestMessage, 
DataOutputStream dos) throws IOException
+        {
+            StreamRequestMetadata[] streamRequestMetadata = 
streamRequestMessage.streamRequestMetadata_;
+            dos.writeInt(streamRequestMetadata.length);
+            for (StreamRequestMetadata bsmd : streamRequestMetadata)
+            {
+                StreamRequestMetadata.serializer().serialize(bsmd, dos);
+            }
+        }
+
+        public StreamRequestMessage deserialize(DataInputStream dis) throws 
IOException
+        {
+            int size = dis.readInt();
+            StreamRequestMetadata[] streamRequestMetadata = new 
StreamRequestMetadata[size];
+            for (int i = 0; i < size; ++i)
+            {
+                streamRequestMetadata[i] = 
StreamRequestMetadata.serializer().deserialize(dis);
+            }
+            return new StreamRequestMessage(streamRequestMetadata);
+        }
+    }
+}

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java?rev=904924&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
 Sun Jan 31 00:21:36 2010
@@ -0,0 +1,80 @@
+package org.apache.cassandra.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+
+/**
+ * This encapsulates information of the list of ranges that a target
+ * node requires to be transferred. This will be bundled in a
+ * StreamRequestsMessage and sent to nodes that are going to handoff
+ * the data.
+*/
+class StreamRequestMetadata
+{
+    private static ICompactSerializer<StreamRequestMetadata> serializer_;
+    static
+    {
+        serializer_ = new StreamRequestMetadataSerializer();
+    }
+
+    protected static ICompactSerializer<StreamRequestMetadata> serializer()
+    {
+        return serializer_;
+    }
+
+    protected InetAddress target_;
+    protected Collection<Range> ranges_;
+
+    StreamRequestMetadata(InetAddress target, Collection<Range> ranges)
+    {
+        target_ = target;
+        ranges_ = ranges;
+    }
+
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");
+        sb.append(target_);
+        sb.append("------->");
+        for ( Range range : ranges_ )
+        {
+            sb.append(range);
+            sb.append(" ");
+        }
+        return sb.toString();
+    }
+
+    private static class StreamRequestMetadataSerializer implements 
ICompactSerializer<StreamRequestMetadata>
+    {
+        public void serialize(StreamRequestMetadata srMetadata, 
DataOutputStream dos) throws IOException
+        {
+            CompactEndPointSerializationHelper.serialize(srMetadata.target_, 
dos);
+            dos.writeInt(srMetadata.ranges_.size());
+            for (Range range : srMetadata.ranges_)
+            {
+                Range.serializer().serialize(range, dos);
+            }
+        }
+
+        public StreamRequestMetadata deserialize(DataInputStream dis) throws 
IOException
+        {
+            InetAddress target = 
CompactEndPointSerializationHelper.deserialize(dis);
+            int size = dis.readInt();
+            List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
+            for( int i = 0; i < size; ++i )
+            {
+                ranges.add(Range.serializer().deserialize(dis));
+            }
+            return new StreamRequestMetadata( target, ranges );
+        }
+    }
+}

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
 (from r904698, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java)
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java&r1=904698&r2=904924&rev=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
 Sun Jan 31 00:21:36 2010
@@ -16,30 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.io;
+ package org.apache.cassandra.streaming;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.IOError;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Collection;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Table;
-
-import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.StreamManager;
 
 import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
 
  /**
  * This verb handler handles the StreamRequestMessage that is sent by

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java?rev=904924&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
 Sun Jan 31 00:21:36 2010
@@ -0,0 +1,166 @@
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.net.InetAddress;
+import java.util.*;
+import java.io.IOException;
+import java.io.File;
+import java.io.IOError;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * This class handles streaming data from one node to another.
+ *
+ * For bootstrap,
+ *  1. BOOTSTRAP_TOKEN asks the most-loaded node what Token to use to split 
its Range in two.
+ *  2. STREAM_REQUEST tells source nodes to send us the necessary Ranges
+ *  3. source nodes send STREAM_INITIATE to us to say "get ready to receive 
data" [if there is data to send]
+ *  4. when we have everything set up to receive the data, we send 
STREAM_INITIATE_DONE back to the source nodes and they start streaming
+ *  5. when streaming is complete, we send STREAM_FINISHED to the source so it 
can clean up on its end
+ *
+ * For unbootstrap, the leaving node starts with step 3 (1 and 2 are skipped 
entirely).  This is why
+ * STREAM_INITIATE is a separate verb, rather than just a reply to 
STREAM_REQUEST; the REQUEST is optional.
+ */
+public class Streaming
+{
+    private static Logger logger = Logger.getLogger(Streaming.class);
+    static String TABLE_NAME = "STREAMING-TABLE-NAME";
+    public static final long RING_DELAY = 30 * 1000; // delay after which we 
assume ring has stablized
+
+    /**
+     * Split out files for all tables on disk locally for each range and then 
stream them to the target endpoint.
+    */
+    public static void transferRanges(InetAddress target, Collection<Range> 
ranges, Runnable callback)
+    {
+        assert ranges.size() > 0;
+
+        if (logger.isDebugEnabled())
+            logger.debug("Beginning transfer process to " + target + " for 
ranges " + StringUtils.join(ranges, ", "));
+
+        /*
+         * (1) dump all the memtables to disk.
+         * (2) anticompaction -- split out the keys in the range specified
+         * (3) transfer the data.
+        */
+        List<String> tables = DatabaseDescriptor.getTables();
+        for (String tName : tables)
+        {
+            try
+            {
+                Table table = Table.open(tName);
+                if (logger.isDebugEnabled())
+                  logger.debug("Flushing memtables ...");
+                for (Future f : table.flush())
+                {
+                    try
+                    {
+                        f.get();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    catch (ExecutionException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+                if (logger.isDebugEnabled())
+                  logger.debug("Performing anticompaction ...");
+                /* Get the list of files that need to be streamed */
+                transferSSTables(target, table.forceAntiCompaction(ranges, 
target), tName); // SSTR GC deletes the file when done
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+        if (callback != null)
+            callback.run();
+    }
+
+    /**
+     * Transfers a group of sstables from a single table to the target endpoint
+     * and then marks them as ready for local deletion.
+     */
+    public static void transferSSTables(InetAddress target, 
List<SSTableReader> sstables, String table) throws IOException
+    {
+        StreamContextManager.StreamContext[] streamContexts = new 
StreamContextManager.StreamContext[SSTable.FILES_ON_DISK * sstables.size()];
+        int i = 0;
+        for (SSTableReader sstable : sstables)
+        {
+            for (String filename : sstable.getAllFilenames())
+            {
+                File file = new File(filename);
+                streamContexts[i++] = new 
StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), 
table);
+            }
+        }
+        if (logger.isDebugEnabled())
+          logger.debug("Stream context metadata " + 
StringUtils.join(streamContexts, ", "));
+
+        StreamManager.instance(target).addFilesToStream(streamContexts);
+        StreamInitiateMessage biMessage = new 
StreamInitiateMessage(streamContexts);
+        Message message = 
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
+        message.addHeader(Streaming.TABLE_NAME, table.getBytes());
+        if (logger.isDebugEnabled())
+          logger.debug("Sending a stream initiate message to " + target + " 
...");
+        MessagingService.instance.sendOneWay(message, target);
+
+        if (streamContexts.length > 0)
+        {
+            logger.info("Waiting for transfer to " + target + " to complete");
+            StreamManager.instance(target).waitForStreamCompletion();
+            // (StreamManager will delete the streamed file on completion.)
+            logger.info("Done with transfer to " + target);
+        }
+    }
+
+    /**
+     * Request ranges to be transferred
+     */
+    public static void requestRanges(InetAddress source, Collection<Range> 
ranges)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Requesting from " + source + " ranges " + 
StringUtils.join(ranges, ", "));
+        StreamRequestMetadata streamRequestMetadata = new 
StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
+        Message message = StreamRequestMessage.makeStreamRequestMessage(new 
StreamRequestMessage(streamRequestMetadata));
+        MessagingService.instance.sendOneWay(message, source);
+    }
+
+}

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java 
(original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java 
Sun Jan 31 00:21:36 2010
@@ -27,8 +27,8 @@
 import java.util.Map;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamInitiateVerbHandler;
 
 import org.junit.Test;
 
@@ -41,7 +41,7 @@
         streamContexts[0] = new 
StreamContextManager.StreamContext("/baz/foo/Standard1-500-Data.db", 100, 
"Keyspace1");
         streamContexts[1] = new 
StreamContextManager.StreamContext("/bar/foo/Standard1-500-Index.db", 100, 
"Keyspace1");
         streamContexts[2] = new 
StreamContextManager.StreamContext("/bad/foo/Standard1-500-Filter.db", 100, 
"Keyspace1");
-        Streaming.StreamInitiateVerbHandler bivh = new 
Streaming.StreamInitiateVerbHandler();
+        StreamInitiateVerbHandler bivh = new StreamInitiateVerbHandler();
         Map<String, String> fileNames = bivh.getNewNames(streamContexts);
         Map<String, String> paths = new HashMap<String, String>();
         for (String ssName : fileNames.keySet())

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java 
(original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java 
Sun Jan 31 00:21:36 2010
@@ -31,6 +31,7 @@
 import org.apache.cassandra.io.SSTableUtils;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.Streaming;
 import org.apache.cassandra.utils.FBUtilities;
 
 import org.junit.Test;


Reply via email to