Author: jbellis
Date: Sun Jan 31 00:21:36 2010
New Revision: 904924
URL: http://svn.apache.org/viewvc?rev=904924&view=rev
Log:
centralize streaming code in org.apache.cassandra.streaming; split out verbs &
handlers into top-level classes
patch by jbellis; reviewed by stuhood for CASSANDRA-751
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
(contents, props changed)
- copied, changed from r904698,
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
(contents, props changed)
- copied, changed from r904698,
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
(contents, props changed)
- copied, changed from r904698,
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(contents, props changed)
- copied, changed from r904698,
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
(with props)
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Sun Jan 31 00:21:36 2010
@@ -27,18 +27,17 @@
import org.apache.log4j.Logger;
import org.apache.commons.lang.ArrayUtils;
- import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.Streaming;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IFailureDetector;
- import org.apache.cassandra.io.Streaming;
import com.google.common.collect.Multimap;
import com.google.common.collect.ArrayListMultimap;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
Sun Jan 31 00:21:36 2010
@@ -25,7 +25,7 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.streaming.StreamContextManager;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
Sun Jan 31 00:21:36 2010
@@ -28,7 +28,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.streaming.Streaming;
import org.apache.log4j.Logger;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
Sun Jan 31 00:21:36 2010
@@ -27,6 +27,8 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.net.FileStreamTask;
+import org.apache.cassandra.streaming.IStreamComplete;
+import org.apache.cassandra.streaming.StreamContextManager;
public class IncomingStreamReader
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Sun Jan 31 00:21:36 2010
@@ -35,7 +35,7 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.streaming.Streaming;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Sun Jan 31 00:21:36 2010
@@ -35,7 +35,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.streaming.Streaming;
/*
* The load balancing algorithm here is an implementation of
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Sun Jan 31 00:21:36 2010
@@ -37,13 +37,12 @@
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
+import org.apache.cassandra.streaming.*;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.Streaming;
-import org.apache.cassandra.io.StreamRequestVerbHandler;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.log4j.Logger;
@@ -210,9 +209,9 @@
// see BootStrapper for a summary of how the bootstrap verbs interact
MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN,
new BootStrapper.BootstrapTokenVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST,
new StreamRequestVerbHandler() );
- MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE,
new Streaming.StreamInitiateVerbHandler());
-
MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE_DONE, new
Streaming.StreamInitiateDoneVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.STREAM_FINISHED,
new Streaming.StreamFinishedVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE,
new StreamInitiateVerbHandler());
+
MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE_DONE, new
StreamInitiateDoneVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.STREAM_FINISHED,
new StreamFinishedVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new
ResponseVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new
TreeRequestVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new
AntiEntropyService.TreeResponseVerbHandler());
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
(from r904698,
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java&r1=904698&r2=904924&rev=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
Sun Jan 31 00:21:36 2010
@@ -16,12 +16,14 @@
* limitations under the License.
*/
-package org.apache.cassandra.net.io;
+package org.apache.cassandra.streaming;
import java.io.IOException;
import java.net.InetAddress;
+import org.apache.cassandra.streaming.StreamContextManager;
+
public interface IStreamComplete
{
/*
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904924&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
Sun Jan 31 00:21:36 2010
@@ -0,0 +1,65 @@
+package org.apache.cassandra.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.IStreamComplete;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * This is the callback handler that is invoked when we have
+ * completely received a single file from a remote host.
+ *
+ * TODO if we move this into CFS we could make addSSTables private, improving
encapsulation.
+*/
+class StreamCompletionHandler implements IStreamComplete
+{
+ private static Logger logger =
Logger.getLogger(StreamCompletionHandler.class);
+
+ public void onStreamCompletion(InetAddress host,
StreamContextManager.StreamContext streamContext,
StreamContextManager.StreamStatus streamStatus) throws IOException
+ {
+ /* Parse the stream context and the file to the list of SSTables in
the associated Column Family Store. */
+ if (streamContext.getTargetFile().contains("-Data.db"))
+ {
+ String tableName = streamContext.getTable();
+ File file = new File( streamContext.getTargetFile() );
+ String fileName = file.getName();
+ String [] temp = fileName.split("-");
+
+ //Open the file to see if all parts are now here
+ try
+ {
+ SSTableReader sstable =
SSTableWriter.renameAndOpen(streamContext.getTargetFile());
+ //TODO add a sanity check that this sstable has all its parts
and is ok
+
Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
+ logger.info("Streaming added " + sstable.getFilename());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Not able to add streamed file " +
streamContext.getTargetFile(), e);
+ }
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("Sending a streaming finished message with " +
streamStatus + " to " + host);
+ /* Send a StreamStatusMessage object which may require the source node
to re-stream certain files. */
+ StreamContextManager.StreamStatusMessage streamStatusMessage = new
StreamContextManager.StreamStatusMessage(streamStatus);
+ Message message =
StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+ MessagingService.instance.sendOneWay(message, host);
+
+ /* If we're done with everything for this host, remove from bootstrap
sources */
+ if (StreamContextManager.isDone(host) &&
StorageService.instance.isBootstrapMode())
+ {
+ StorageService.instance.removeBootstrapSource(host,
streamContext.getTable());
+ }
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
(from r904698,
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java&r1=904698&r2=904924&rev=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
Sun Jan 31 00:21:36 2010
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.net.io;
+package org.apache.cassandra.streaming;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -27,6 +27,7 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.IStreamComplete;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -34,7 +35,7 @@
public class StreamContextManager
{
- private static Logger logger_ =
Logger.getLogger(StreamContextManager.class);
+ private static Logger logger =
Logger.getLogger(StreamContextManager.class);
public static enum StreamCompletionAction
{
@@ -160,7 +161,7 @@
return expectedBytes_;
}
- void setAction(StreamContextManager.StreamCompletionAction action)
+ public void setAction(StreamContextManager.StreamCompletionAction
action)
{
action_ = action;
}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904924&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
Sun Jan 31 00:21:36 2010
@@ -0,0 +1,50 @@
+package org.apache.cassandra.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamManager;
+
+public class StreamFinishedVerbHandler implements IVerbHandler
+{
+ private static Logger logger =
Logger.getLogger(StreamFinishedVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+
+ try
+ {
+ StreamContextManager.StreamStatusMessage streamStatusMessage =
StreamContextManager.StreamStatusMessage.serializer().deserialize(new
DataInputStream(bufIn));
+ StreamContextManager.StreamStatus streamStatus =
streamStatusMessage.getStreamStatus();
+
+ switch (streamStatus.getAction())
+ {
+ case DELETE:
+
StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
+ break;
+
+ case STREAM:
+ if (logger.isDebugEnabled())
+ logger.debug("Need to re-stream file " +
streamStatus.getFile());
+ StreamManager.instance(message.getFrom()).repeat();
+ break;
+
+ default:
+ break;
+ }
+ }
+ catch (IOException ex)
+ {
+ throw new IOError(ex);
+ }
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=904924&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
Sun Jan 31 00:21:36 2010
@@ -0,0 +1,19 @@
+package org.apache.cassandra.streaming;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamManager;
+
+public class StreamInitiateDoneVerbHandler implements IVerbHandler
+{
+ private static Logger logger =
Logger.getLogger(StreamInitiateDoneVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Received a stream initiate done message ...");
+ StreamManager.instance(message.getFrom()).start();
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904924&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
Sun Jan 31 00:21:36 2010
@@ -0,0 +1,145 @@
+package org.apache.cassandra.streaming;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class StreamInitiateVerbHandler implements IVerbHandler
+{
+ private static Logger logger =
Logger.getLogger(StreamInitiateVerbHandler.class);
+
+ /*
+ * Here we handle the StreamInitiateMessage. Here we get the
+ * array of StreamContexts. We get file names for the column
+ * families associated with the files and replace them with the
+ * file names as obtained from the column family store on the
+ * receiving end.
+ */
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("StreamInitiateVerbeHandler.doVerb %s
%s %s", message.getVerb(), message.getMessageId(), message.getMessageType()));
+
+ try
+ {
+ StreamInitiateMessage biMsg =
StreamInitiateMessage.serializer().deserialize(new DataInputStream(bufIn));
+ StreamContextManager.StreamContext[] streamContexts =
biMsg.getStreamContext();
+
+ if (streamContexts.length == 0)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("no data needed from " + message.getFrom());
+ if (StorageService.instance.isBootstrapMode())
+
StorageService.instance.removeBootstrapSource(message.getFrom(), new
String(message.getHeader(Streaming.TABLE_NAME)));
+ return;
+ }
+
+ Map<String, String> fileNames = getNewNames(streamContexts);
+ Map<String, String> pathNames = new HashMap<String, String>();
+ for (String ssName : fileNames.keySet())
+ pathNames.put(ssName,
DatabaseDescriptor.getNextAvailableDataLocation());
+ /*
+ * For each of stream context's in the incoming message
+ * generate the new file names and store the new file names
+ * in the StreamContextManager.
+ */
+ for (StreamContextManager.StreamContext streamContext :
streamContexts )
+ {
+ StreamContextManager.StreamStatus streamStatus = new
StreamContextManager.StreamStatus(streamContext.getTargetFile(),
streamContext.getExpectedBytes() );
+ String file = getNewFileNameFromOldContextAndNames(fileNames,
pathNames, streamContext);
+
+ if (logger.isDebugEnabled())
+ logger.debug("Received Data from : " + message.getFrom() +
" " + streamContext.getTargetFile() + " " + file);
+ streamContext.setTargetFile(file);
+ addStreamContext(message.getFrom(), streamContext,
streamStatus);
+ }
+
+
StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new
StreamCompletionHandler());
+ if (logger.isDebugEnabled())
+ logger.debug("Sending a stream initiate done message ...");
+ Message doneMessage = new Message(FBUtilities.getLocalAddress(),
"", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] );
+ MessagingService.instance.sendOneWay(doneMessage,
message.getFrom());
+ }
+ catch (IOException ex)
+ {
+ throw new IOError(ex);
+ }
+ }
+
+ public String getNewFileNameFromOldContextAndNames(Map<String, String>
fileNames,
+ Map<String, String>
pathNames,
+
StreamContextManager.StreamContext streamContext)
+ {
+ File sourceFile = new File( streamContext.getTargetFile() );
+ String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
+ String cfName = piece[0];
+ String ssTableNum = piece[1];
+ String typeOfFile = piece[2];
+
+ String newFileNameExpanded = fileNames.get(streamContext.getTable() +
"-" + cfName + "-" + ssTableNum);
+ String path = pathNames.get(streamContext.getTable() + "-" + cfName +
"-" + ssTableNum);
+ //Drop type (Data.db) from new FileName
+ String newFileName = newFileNameExpanded.replace("Data.db",
typeOfFile);
+ return path + File.separator + streamContext.getTable() +
File.separator + newFileName;
+ }
+
+ // todo: this method needs to be private, or package at the very least for
easy unit testing.
+ public Map<String, String>
getNewNames(StreamContextManager.StreamContext[] streamContexts) throws
IOException
+ {
+ /*
+ * Mapping for each file with unique CF-i ---> new file name. For eg.
+ * for a file with name <CF>-<i>-Data.db there is a corresponding
+ * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
+ * generated file name.
+ */
+ Map<String, String> fileNames = new HashMap<String, String>();
+ /* Get the distinct entries from StreamContexts i.e have one entry per
Data/Index/Filter file set */
+ Set<String> distinctEntries = new HashSet<String>();
+ for ( StreamContextManager.StreamContext streamContext :
streamContexts )
+ {
+ String[] pieces = FBUtilities.strip(new
File(streamContext.getTargetFile()).getName(), "-");
+ distinctEntries.add(streamContext.getTable() + "-" + pieces[0] +
"-" + pieces[1] );
+ }
+
+ /* Generate unique file names per entry */
+ for ( String distinctEntry : distinctEntries )
+ {
+ String tableName;
+ String[] pieces = FBUtilities.strip(distinctEntry, "-");
+ tableName = pieces[0];
+ Table table = Table.open( tableName );
+
+ ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
+ if (logger.isDebugEnabled())
+ logger.debug("Generating file name for " + distinctEntry + "
...");
+ fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
+ }
+
+ return fileNames;
+ }
+
+ private void addStreamContext(InetAddress host,
StreamContextManager.StreamContext streamContext,
StreamContextManager.StreamStatus streamStatus)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Adding stream context " + streamContext + " for " +
host + " ...");
+ StreamContextManager.addStreamContext(host, streamContext,
streamStatus);
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
(from r904698,
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java&r1=904698&r2=904924&rev=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
Sun Jan 31 00:21:36 2010
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.service;
+package org.apache.cassandra.streaming;
import java.io.File;
import java.io.IOException;
@@ -28,15 +28,14 @@
import java.net.InetAddress;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.streaming.StreamContextManager;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.log4j.Logger;
-/*
- * This class manages the streaming of multiple files
- * one after the other.
+/**
+ * This class manages the streaming of multiple files one after the other.
*/
public class StreamManager
{
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=904924&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
Sun Jan 31 00:21:36 2010
@@ -0,0 +1,76 @@
+package org.apache.cassandra.streaming;
+
+import java.io.*;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+* This class encapsulates the message that needs to be sent to nodes
+* that handoff data. The message contains information about ranges
+* that need to be transferred and the target node.
+*/
+class StreamRequestMessage
+{
+ private static ICompactSerializer<StreamRequestMessage> serializer_;
+ static
+ {
+ serializer_ = new StreamRequestMessageSerializer();
+ }
+
+ protected static ICompactSerializer<StreamRequestMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ protected static Message makeStreamRequestMessage(StreamRequestMessage
streamRequestMessage)
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ try
+ {
+ StreamRequestMessage.serializer().serialize(streamRequestMessage,
dos);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ return new Message(FBUtilities.getLocalAddress(),
StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST,
bos.toByteArray() );
+ }
+
+ protected StreamRequestMetadata[] streamRequestMetadata_ = new
StreamRequestMetadata[0];
+
+ // TODO only actually ever need one BM, not an array
+ StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata)
+ {
+ assert streamRequestMetadata != null;
+ streamRequestMetadata_ = streamRequestMetadata;
+ }
+
+ private static class StreamRequestMessageSerializer implements
ICompactSerializer<StreamRequestMessage>
+ {
+ public void serialize(StreamRequestMessage streamRequestMessage,
DataOutputStream dos) throws IOException
+ {
+ StreamRequestMetadata[] streamRequestMetadata =
streamRequestMessage.streamRequestMetadata_;
+ dos.writeInt(streamRequestMetadata.length);
+ for (StreamRequestMetadata bsmd : streamRequestMetadata)
+ {
+ StreamRequestMetadata.serializer().serialize(bsmd, dos);
+ }
+ }
+
+ public StreamRequestMessage deserialize(DataInputStream dis) throws
IOException
+ {
+ int size = dis.readInt();
+ StreamRequestMetadata[] streamRequestMetadata = new
StreamRequestMetadata[size];
+ for (int i = 0; i < size; ++i)
+ {
+ streamRequestMetadata[i] =
StreamRequestMetadata.serializer().deserialize(dis);
+ }
+ return new StreamRequestMessage(streamRequestMetadata);
+ }
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java?rev=904924&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
Sun Jan 31 00:21:36 2010
@@ -0,0 +1,80 @@
+package org.apache.cassandra.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+
+/**
+ * This encapsulates information of the list of ranges that a target
+ * node requires to be transferred. This will be bundled in a
+ * StreamRequestsMessage and sent to nodes that are going to handoff
+ * the data.
+*/
+class StreamRequestMetadata
+{
+ private static ICompactSerializer<StreamRequestMetadata> serializer_;
+ static
+ {
+ serializer_ = new StreamRequestMetadataSerializer();
+ }
+
+ protected static ICompactSerializer<StreamRequestMetadata> serializer()
+ {
+ return serializer_;
+ }
+
+ protected InetAddress target_;
+ protected Collection<Range> ranges_;
+
+ StreamRequestMetadata(InetAddress target, Collection<Range> ranges)
+ {
+ target_ = target;
+ ranges_ = ranges;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ sb.append(target_);
+ sb.append("------->");
+ for ( Range range : ranges_ )
+ {
+ sb.append(range);
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+
+ private static class StreamRequestMetadataSerializer implements
ICompactSerializer<StreamRequestMetadata>
+ {
+ public void serialize(StreamRequestMetadata srMetadata,
DataOutputStream dos) throws IOException
+ {
+ CompactEndPointSerializationHelper.serialize(srMetadata.target_,
dos);
+ dos.writeInt(srMetadata.ranges_.size());
+ for (Range range : srMetadata.ranges_)
+ {
+ Range.serializer().serialize(range, dos);
+ }
+ }
+
+ public StreamRequestMetadata deserialize(DataInputStream dis) throws
IOException
+ {
+ InetAddress target =
CompactEndPointSerializationHelper.deserialize(dis);
+ int size = dis.readInt();
+ List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
+ for( int i = 0; i < size; ++i )
+ {
+ ranges.add(Range.serializer().deserialize(dis));
+ }
+ return new StreamRequestMetadata( target, ranges );
+ }
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(from r904698,
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java&r1=904698&r2=904924&rev=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
Sun Jan 31 00:21:36 2010
@@ -16,30 +16,17 @@
* limitations under the License.
*/
-package org.apache.cassandra.io;
+ package org.apache.cassandra.streaming;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
-import java.io.File;
import java.io.IOException;
import java.io.IOError;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Collection;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Table;
-
-import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.StreamManager;
import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
/**
* This verb handler handles the StreamRequestMessage that is sent by
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java?rev=904924&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
Sun Jan 31 00:21:36 2010
@@ -0,0 +1,166 @@
+package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.net.InetAddress;
+import java.util.*;
+import java.io.IOException;
+import java.io.File;
+import java.io.IOError;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * This class handles streaming data from one node to another.
+ *
+ * For bootstrap,
+ * 1. BOOTSTRAP_TOKEN asks the most-loaded node what Token to use to split
its Range in two.
+ * 2. STREAM_REQUEST tells source nodes to send us the necessary Ranges
+ * 3. source nodes send STREAM_INITIATE to us to say "get ready to receive
data" [if there is data to send]
+ * 4. when we have everything set up to receive the data, we send
STREAM_INITIATE_DONE back to the source nodes and they start streaming
+ * 5. when streaming is complete, we send STREAM_FINISHED to the source so it
can clean up on its end
+ *
+ * For unbootstrap, the leaving node starts with step 3 (1 and 2 are skipped
entirely). This is why
+ * STREAM_INITIATE is a separate verb, rather than just a reply to
STREAM_REQUEST; the REQUEST is optional.
+ */
+public class Streaming
+{
+ private static Logger logger = Logger.getLogger(Streaming.class);
+ static String TABLE_NAME = "STREAMING-TABLE-NAME";
+ public static final long RING_DELAY = 30 * 1000; // delay after which we
assume ring has stablized
+
+ /**
+ * Split out files for all tables on disk locally for each range and then
stream them to the target endpoint.
+ */
+ public static void transferRanges(InetAddress target, Collection<Range>
ranges, Runnable callback)
+ {
+ assert ranges.size() > 0;
+
+ if (logger.isDebugEnabled())
+ logger.debug("Beginning transfer process to " + target + " for
ranges " + StringUtils.join(ranges, ", "));
+
+ /*
+ * (1) dump all the memtables to disk.
+ * (2) anticompaction -- split out the keys in the range specified
+ * (3) transfer the data.
+ */
+ List<String> tables = DatabaseDescriptor.getTables();
+ for (String tName : tables)
+ {
+ try
+ {
+ Table table = Table.open(tName);
+ if (logger.isDebugEnabled())
+ logger.debug("Flushing memtables ...");
+ for (Future f : table.flush())
+ {
+ try
+ {
+ f.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("Performing anticompaction ...");
+ /* Get the list of files that need to be streamed */
+ transferSSTables(target, table.forceAntiCompaction(ranges,
target), tName); // SSTR GC deletes the file when done
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ if (callback != null)
+ callback.run();
+ }
+
+ /**
+ * Transfers a group of sstables from a single table to the target endpoint
+ * and then marks them as ready for local deletion.
+ */
+ public static void transferSSTables(InetAddress target,
List<SSTableReader> sstables, String table) throws IOException
+ {
+ StreamContextManager.StreamContext[] streamContexts = new
StreamContextManager.StreamContext[SSTable.FILES_ON_DISK * sstables.size()];
+ int i = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ for (String filename : sstable.getAllFilenames())
+ {
+ File file = new File(filename);
+ streamContexts[i++] = new
StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(),
table);
+ }
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("Stream context metadata " +
StringUtils.join(streamContexts, ", "));
+
+ StreamManager.instance(target).addFilesToStream(streamContexts);
+ StreamInitiateMessage biMessage = new
StreamInitiateMessage(streamContexts);
+ Message message =
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
+ message.addHeader(Streaming.TABLE_NAME, table.getBytes());
+ if (logger.isDebugEnabled())
+ logger.debug("Sending a stream initiate message to " + target + "
...");
+ MessagingService.instance.sendOneWay(message, target);
+
+ if (streamContexts.length > 0)
+ {
+ logger.info("Waiting for transfer to " + target + " to complete");
+ StreamManager.instance(target).waitForStreamCompletion();
+ // (StreamManager will delete the streamed file on completion.)
+ logger.info("Done with transfer to " + target);
+ }
+ }
+
+ /**
+ * Request ranges to be transferred
+ */
+ public static void requestRanges(InetAddress source, Collection<Range>
ranges)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Requesting from " + source + " ranges " +
StringUtils.join(ranges, ", "));
+ StreamRequestMetadata streamRequestMetadata = new
StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
+ Message message = StreamRequestMessage.makeStreamRequestMessage(new
StreamRequestMessage(streamRequestMetadata));
+ MessagingService.instance.sendOneWay(message, source);
+ }
+
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Sun Jan 31 00:21:36 2010
@@ -27,8 +27,8 @@
import java.util.Map;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamInitiateVerbHandler;
import org.junit.Test;
@@ -41,7 +41,7 @@
streamContexts[0] = new
StreamContextManager.StreamContext("/baz/foo/Standard1-500-Data.db", 100,
"Keyspace1");
streamContexts[1] = new
StreamContextManager.StreamContext("/bar/foo/Standard1-500-Index.db", 100,
"Keyspace1");
streamContexts[2] = new
StreamContextManager.StreamContext("/bad/foo/Standard1-500-Filter.db", 100,
"Keyspace1");
- Streaming.StreamInitiateVerbHandler bivh = new
Streaming.StreamInitiateVerbHandler();
+ StreamInitiateVerbHandler bivh = new StreamInitiateVerbHandler();
Map<String, String> fileNames = bivh.getNewNames(streamContexts);
Map<String, String> paths = new HashMap<String, String>();
for (String ssName : fileNames.keySet())
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=904924&r1=904923&r2=904924&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
Sun Jan 31 00:21:36 2010
@@ -31,6 +31,7 @@
import org.apache.cassandra.io.SSTableUtils;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.Streaming;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.Test;