Author: jbellis
Date: Wed Nov 11 16:11:36 2009
New Revision: 834937

URL: http://svn.apache.org/viewvc?rev=834937&view=rev
Log:
clean up transfer code from BMVH; move to Streaming.java
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-435

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=834937&r1=834936&r2=834937&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
 Wed Nov 11 16:11:36 2009
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.IOError;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Collection;
@@ -28,6 +29,8 @@
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.Streaming;
+
 import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
@@ -35,10 +38,11 @@
 import org.apache.cassandra.net.io.StreamContextManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.StreamManager;
-import org.apache.cassandra.utils.LogUtil;
+
 import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
 
-/**
+ /**
  * This verb handler handles the BootstrapMetadataMessage that is sent
  * by the leader to the nodes that are responsible for handing off data. 
 */
@@ -61,125 +65,17 @@
         {
             BootstrapMetadataMessage bsMetadataMessage = 
BootstrapMetadataMessage.serializer().deserialize(bufIn);
             BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
-            
-            /*
-             * This is for debugging purposes. Remove later.
-            */
-            for ( BootstrapMetadata bsmd : bsMetadata )
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug(bsmd.toString());                              
        
-            }
-            
-            for ( BootstrapMetadata bsmd : bsMetadata )
+
+            for (BootstrapMetadata bsmd : bsMetadata)
             {
-                long startTime = System.currentTimeMillis();
-                doTransfer(bsmd.target_, bsmd.ranges_);     
                 if (logger_.isDebugEnabled())
-                  logger_.debug("Time taken to boostrap " + 
-                        bsmd.target_ + 
-                        " is " + 
-                        (System.currentTimeMillis() - startTime) +
-                        " msecs.");
-            }
-        }
-        catch ( IOException ex )
-        {
-            logger_.info(LogUtil.throwableToString(ex));
-        }
-    }
-    
-    /*
-     * This method needs to figure out the files on disk
-     * locally for each range and then stream them using
-     * the Bootstrap protocol to the target endpoint.
-    */
-    private void doTransfer(InetAddress target, Collection<Range> ranges) 
throws IOException
-    {
-        if ( ranges.size() == 0 )
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("No ranges to give scram ...");
-            return;
-        }
-        
-        /* Just for debugging process - remove later */            
-        for ( Range range : ranges )
-        {
-            StringBuilder sb = new StringBuilder("");                
-            sb.append(range.toString());
-            sb.append(" ");            
-            if (logger_.isDebugEnabled())
-              logger_.debug("Beginning transfer process to " + target + " for 
ranges " + sb.toString());                
-        }
-      
-        /*
-         * (1) First we dump all the memtables to disk.
-         * (2) Run a version of compaction which will basically
-         *     put the keys in the range specified into a directory
-         *     named as per the endpoint it is destined for inside the
-         *     bootstrap directory.
-         * (3) Handoff the data.
-        */
-        List<String> tables = DatabaseDescriptor.getTables();
-        for ( String tName : tables )
-        {
-            Table table = Table.open(tName);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Flushing memtables ...");
-            table.flush(false);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Forcing compaction ...");
-            /* Get the counting bloom filter for each endpoint and the list of 
files that need to be streamed */
-            List<String> fileList = new ArrayList<String>();
-            for (SSTableReader sstable : table.forceAntiCompaction(ranges, 
target))
-            {
-                fileList.add(sstable.indexFilename());
-                fileList.add(sstable.filterFilename());
-                fileList.add(sstable.getFilename());
+                    logger_.debug(bsmd.toString());
+                Streaming.transferRanges(bsmd.target_, bsmd.ranges_);
             }
-            doHandoff(target, fileList, tName);
-            //In Handoff, Streaming the file also deletes the file, so no 
cleanup needed            
-        }
-    }
-
-    /**
-     * Stream the files in the bootstrap directory over to the
-     * node being bootstrapped.
-    */
-    private void doHandoff(InetAddress target, List<String> fileList, String 
table) throws IOException
-    {
-        List<File> filesList = new ArrayList<File>();
-        for(String file : fileList)
-        {
-            filesList.add(new File(file));
         }
-        File[] files = filesList.toArray(new File[0]);
-        StreamContextManager.StreamContext[] streamContexts = new 
StreamContextManager.StreamContext[files.length];
-        int i = 0;
-        for ( File file : files )
-        {
-            streamContexts[i] = new 
StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), 
table);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Stream context metadata " + streamContexts[i]);
-            ++i;
-        }
-        
-        if ( files.length > 0 )
+        catch (IOException ex)
         {
-            /* Set up the stream manager with the files that need to streamed 
*/
-            StreamManager.instance(target).addFilesToStream(streamContexts);
-            /* Send the bootstrap initiate message */
-            BootstrapInitiateMessage biMessage = new 
BootstrapInitiateMessage(streamContexts);
-            Message message = 
BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Sending a bootstrap initiate message to " + 
target + " ...");
-            MessagingService.instance().sendOneWay(message, target);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Waiting for transfer to " + target + " to 
complete");
-            StreamManager.instance(target).waitForStreamCompletion();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Done with transfer to " + target);  
+            throw new IOError(ex);
         }
     }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=834937&r1=834936&r2=834937&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Wed 
Nov 11 16:11:36 2009
@@ -24,6 +24,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.Arrays;
 
 import org.apache.log4j.Logger;
 import org.apache.commons.lang.StringUtils;
@@ -128,6 +129,12 @@
         return path;
     }
 
+    /** @return full paths to all the files associated w/ this SSTable */
+    public List<String> getAllFilenames()
+    {
+        return Arrays.asList(getFilename(), indexFilename(), filterFilename());
+    }
+
     public String getColumnFamilyName()
     {
         return columnFamilyName;

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=834937&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java 
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java 
Wed Nov 11 16:11:36 2009
@@ -0,0 +1,98 @@
+package org.apache.cassandra.io;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StreamManager;
+
+public class Streaming
+{
+    private static Logger logger = Logger.getLogger(Streaming.class);
+
+    /*
+     * This method needs to figure out the files on disk
+     * locally for each range and then stream them using
+     * the Bootstrap protocol to the target endpoint.
+    */
+    public static void transferRanges(InetAddress target, Collection<Range> 
ranges) throws IOException
+    {
+        assert ranges.size() > 0;
+
+        if (logger.isDebugEnabled())
+            logger.debug("Beginning transfer process to " + target + " for 
ranges " + StringUtils.join(ranges, ", "));
+
+        /*
+         * (1) First we dump all the memtables to disk.
+         * (2) Run a version of compaction which will basically
+         *     put the keys in the range specified into a directory
+         *     named as per the endpoint it is destined for inside the
+         *     bootstrap directory.
+         * (3) Handoff the data.
+        */
+        List<String> tables = DatabaseDescriptor.getTables();
+        for (String tName : tables)
+        {
+            Table table = Table.open(tName);
+            if (logger.isDebugEnabled())
+              logger.debug("Flushing memtables ...");
+            table.flush(false);
+            if (logger.isDebugEnabled())
+              logger.debug("Performing anticompaction ...");
+            /* Get the list of files that need to be streamed */
+            List<String> fileList = new ArrayList<String>();
+            for (SSTableReader sstable : table.forceAntiCompaction(ranges, 
target))
+            {
+                fileList.addAll(sstable.getAllFilenames());
+            }
+            transferOneTable(target, fileList, tName); // also deletes the 
file, so no further cleanup needed
+        }
+    }
+
+    /**
+     * Stream the files in the bootstrap directory over to the
+     * node being bootstrapped.
+    */
+    private static void transferOneTable(InetAddress target, List<String> 
fileList, String table) throws IOException
+    {
+        if (fileList.isEmpty())
+            return;
+
+        StreamContextManager.StreamContext[] streamContexts = new 
StreamContextManager.StreamContext[fileList.size()];
+        int i = 0;
+        for (String filename : fileList)
+        {
+            File file = new File(filename);
+            streamContexts[i++] = new 
StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), 
table);
+            if (logger.isDebugEnabled())
+              logger.debug("Stream context metadata " + streamContexts[i]);
+        }
+
+        /* Set up the stream manager with the files that need to streamed */
+        StreamManager.instance(target).addFilesToStream(streamContexts);
+        /* Send the bootstrap initiate message */
+        BootstrapInitiateMessage biMessage = new 
BootstrapInitiateMessage(streamContexts);
+        Message message = 
BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
+        if (logger.isDebugEnabled())
+          logger.debug("Sending a bootstrap initiate message to " + target + " 
...");
+        MessagingService.instance().sendOneWay(message, target);
+        if (logger.isDebugEnabled())
+          logger.debug("Waiting for transfer to " + target + " to complete");
+        StreamManager.instance(target).waitForStreamCompletion();
+        if (logger.isDebugEnabled())
+          logger.debug("Done with transfer to " + target);
+    }
+}


Reply via email to