Author: jbellis
Date: Wed Nov 11 16:11:36 2009
New Revision: 834937
URL: http://svn.apache.org/viewvc?rev=834937&view=rev
Log:
clean up transfer code from BMVH; move to Streaming.java
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-435
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=834937&r1=834936&r2=834937&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Wed Nov 11 16:11:36 2009
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
+import java.io.IOError;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
@@ -28,6 +29,8 @@
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.Streaming;
+
import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -35,10 +38,11 @@
import org.apache.cassandra.net.io.StreamContextManager;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StreamManager;
-import org.apache.cassandra.utils.LogUtil;
+
import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
-/**
+ /**
* This verb handler handles the BootstrapMetadataMessage that is sent
* by the leader to the nodes that are responsible for handing off data.
*/
@@ -61,125 +65,17 @@
{
BootstrapMetadataMessage bsMetadataMessage =
BootstrapMetadataMessage.serializer().deserialize(bufIn);
BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
-
- /*
- * This is for debugging purposes. Remove later.
- */
- for ( BootstrapMetadata bsmd : bsMetadata )
- {
- if (logger_.isDebugEnabled())
- logger_.debug(bsmd.toString());
- }
-
- for ( BootstrapMetadata bsmd : bsMetadata )
+
+ for (BootstrapMetadata bsmd : bsMetadata)
{
- long startTime = System.currentTimeMillis();
- doTransfer(bsmd.target_, bsmd.ranges_);
if (logger_.isDebugEnabled())
- logger_.debug("Time taken to boostrap " +
- bsmd.target_ +
- " is " +
- (System.currentTimeMillis() - startTime) +
- " msecs.");
- }
- }
- catch ( IOException ex )
- {
- logger_.info(LogUtil.throwableToString(ex));
- }
- }
-
- /*
- * This method needs to figure out the files on disk
- * locally for each range and then stream them using
- * the Bootstrap protocol to the target endpoint.
- */
- private void doTransfer(InetAddress target, Collection<Range> ranges)
throws IOException
- {
- if ( ranges.size() == 0 )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("No ranges to give scram ...");
- return;
- }
-
- /* Just for debugging process - remove later */
- for ( Range range : ranges )
- {
- StringBuilder sb = new StringBuilder("");
- sb.append(range.toString());
- sb.append(" ");
- if (logger_.isDebugEnabled())
- logger_.debug("Beginning transfer process to " + target + " for
ranges " + sb.toString());
- }
-
- /*
- * (1) First we dump all the memtables to disk.
- * (2) Run a version of compaction which will basically
- * put the keys in the range specified into a directory
- * named as per the endpoint it is destined for inside the
- * bootstrap directory.
- * (3) Handoff the data.
- */
- List<String> tables = DatabaseDescriptor.getTables();
- for ( String tName : tables )
- {
- Table table = Table.open(tName);
- if (logger_.isDebugEnabled())
- logger_.debug("Flushing memtables ...");
- table.flush(false);
- if (logger_.isDebugEnabled())
- logger_.debug("Forcing compaction ...");
- /* Get the counting bloom filter for each endpoint and the list of
files that need to be streamed */
- List<String> fileList = new ArrayList<String>();
- for (SSTableReader sstable : table.forceAntiCompaction(ranges,
target))
- {
- fileList.add(sstable.indexFilename());
- fileList.add(sstable.filterFilename());
- fileList.add(sstable.getFilename());
+ logger_.debug(bsmd.toString());
+ Streaming.transferRanges(bsmd.target_, bsmd.ranges_);
}
- doHandoff(target, fileList, tName);
- //In Handoff, Streaming the file also deletes the file, so no
cleanup needed
- }
- }
-
- /**
- * Stream the files in the bootstrap directory over to the
- * node being bootstrapped.
- */
- private void doHandoff(InetAddress target, List<String> fileList, String
table) throws IOException
- {
- List<File> filesList = new ArrayList<File>();
- for(String file : fileList)
- {
- filesList.add(new File(file));
}
- File[] files = filesList.toArray(new File[0]);
- StreamContextManager.StreamContext[] streamContexts = new
StreamContextManager.StreamContext[files.length];
- int i = 0;
- for ( File file : files )
- {
- streamContexts[i] = new
StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(),
table);
- if (logger_.isDebugEnabled())
- logger_.debug("Stream context metadata " + streamContexts[i]);
- ++i;
- }
-
- if ( files.length > 0 )
+ catch (IOException ex)
{
- /* Set up the stream manager with the files that need to streamed
*/
- StreamManager.instance(target).addFilesToStream(streamContexts);
- /* Send the bootstrap initiate message */
- BootstrapInitiateMessage biMessage = new
BootstrapInitiateMessage(streamContexts);
- Message message =
BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap initiate message to " +
target + " ...");
- MessagingService.instance().sendOneWay(message, target);
- if (logger_.isDebugEnabled())
- logger_.debug("Waiting for transfer to " + target + " to
complete");
- StreamManager.instance(target).waitForStreamCompletion();
- if (logger_.isDebugEnabled())
- logger_.debug("Done with transfer to " + target);
+ throw new IOError(ex);
}
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=834937&r1=834936&r2=834937&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Wed
Nov 11 16:11:36 2009
@@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.Arrays;
import org.apache.log4j.Logger;
import org.apache.commons.lang.StringUtils;
@@ -128,6 +129,12 @@
return path;
}
+ /** @return full paths to all the files associated w/ this SSTable */
+ public List<String> getAllFilenames()
+ {
+ return Arrays.asList(getFilename(), indexFilename(), filterFilename());
+ }
+
public String getColumnFamilyName()
{
return columnFamilyName;
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=834937&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
Wed Nov 11 16:11:36 2009
@@ -0,0 +1,98 @@
+package org.apache.cassandra.io;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StreamManager;
+
+public class Streaming
+{
+ private static Logger logger = Logger.getLogger(Streaming.class);
+
+ /*
+ * This method needs to figure out the files on disk
+ * locally for each range and then stream them using
+ * the Bootstrap protocol to the target endpoint.
+ */
+ public static void transferRanges(InetAddress target, Collection<Range>
ranges) throws IOException
+ {
+ assert ranges.size() > 0;
+
+ if (logger.isDebugEnabled())
+ logger.debug("Beginning transfer process to " + target + " for
ranges " + StringUtils.join(ranges, ", "));
+
+ /*
+ * (1) First we dump all the memtables to disk.
+ * (2) Run a version of compaction which will basically
+ * put the keys in the range specified into a directory
+ * named as per the endpoint it is destined for inside the
+ * bootstrap directory.
+ * (3) Handoff the data.
+ */
+ List<String> tables = DatabaseDescriptor.getTables();
+ for (String tName : tables)
+ {
+ Table table = Table.open(tName);
+ if (logger.isDebugEnabled())
+ logger.debug("Flushing memtables ...");
+ table.flush(false);
+ if (logger.isDebugEnabled())
+ logger.debug("Performing anticompaction ...");
+ /* Get the list of files that need to be streamed */
+ List<String> fileList = new ArrayList<String>();
+ for (SSTableReader sstable : table.forceAntiCompaction(ranges,
target))
+ {
+ fileList.addAll(sstable.getAllFilenames());
+ }
+ transferOneTable(target, fileList, tName); // also deletes the
file, so no further cleanup needed
+ }
+ }
+
+ /**
+ * Stream the files in the bootstrap directory over to the
+ * node being bootstrapped.
+ */
+ private static void transferOneTable(InetAddress target, List<String>
fileList, String table) throws IOException
+ {
+ if (fileList.isEmpty())
+ return;
+
+ StreamContextManager.StreamContext[] streamContexts = new
StreamContextManager.StreamContext[fileList.size()];
+ int i = 0;
+ for (String filename : fileList)
+ {
+ File file = new File(filename);
+ streamContexts[i++] = new
StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(),
table);
+ if (logger.isDebugEnabled())
+ logger.debug("Stream context metadata " + streamContexts[i]);
+ }
+
+ /* Set up the stream manager with the files that need to streamed */
+ StreamManager.instance(target).addFilesToStream(streamContexts);
+ /* Send the bootstrap initiate message */
+ BootstrapInitiateMessage biMessage = new
BootstrapInitiateMessage(streamContexts);
+ Message message =
BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
+ if (logger.isDebugEnabled())
+ logger.debug("Sending a bootstrap initiate message to " + target + "
...");
+ MessagingService.instance().sendOneWay(message, target);
+ if (logger.isDebugEnabled())
+ logger.debug("Waiting for transfer to " + target + " to complete");
+ StreamManager.instance(target).waitForStreamCompletion();
+ if (logger.isDebugEnabled())
+ logger.debug("Done with transfer to " + target);
+ }
+}