Author: jbellis
Date: Sun Jan 31 00:22:29 2010
New Revision: 904925
URL: http://svn.apache.org/viewvc?rev=904925&view=rev
Log:
split Streaming into StreamOut and StreamIn; clean up StreamManager
patch by jbellis; reviewed by stuhood for CASSANDRA-751
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
(contents, props changed)
- copied, changed from r904924,
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Sun Jan 31 00:22:29 2010
@@ -32,7 +32,7 @@
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.streaming.Streaming;
+ import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -79,7 +79,7 @@
InetAddress source = entry.getKey();
for (String table :
DatabaseDescriptor.getNonSystemTables())
StorageService.instance.addBootstrapSource(source,
table);
- Streaming.requestRanges(source, entry.getValue());
+ StreamIn.requestRanges(source, entry.getValue());
}
}
}, "Boostrap requester").start();
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
Sun Jan 31 00:22:29 2010
@@ -28,7 +28,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.Streaming;
+import org.apache.cassandra.streaming.StreamOut;
import org.apache.log4j.Logger;
@@ -415,10 +415,10 @@
Hashtable<InetAddress, Long> copy = new Hashtable<InetAddress,
Long>(justRemovedEndPoints_);
for (Map.Entry<InetAddress, Long> entry : copy.entrySet())
{
- if ((now - entry.getValue()) > Streaming.RING_DELAY)
+ if ((now - entry.getValue()) > StreamOut.RING_DELAY)
{
if (logger_.isDebugEnabled())
- logger_.debug(Streaming.RING_DELAY + " elapsed, "
+ entry.getKey() + " gossip quarantine over");
+ logger_.debug(StreamOut.RING_DELAY + " elapsed, "
+ entry.getKey() + " gossip quarantine over");
justRemovedEndPoints_.remove(entry.getKey());
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Sun Jan 31 00:22:29 2010
@@ -35,7 +35,7 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.streaming.Streaming;
+import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -628,7 +628,7 @@
{
List<Range> ranges = new ArrayList<Range>(differences);
List<SSTableReader> sstables =
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
- Streaming.transferSSTables(remote, sstables, cf.left);
+ StreamOut.transferSSTables(remote, sstables, cf.left);
}
catch(Exception e)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Sun Jan 31 00:22:29 2010
@@ -35,7 +35,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.streaming.Streaming;
+import org.apache.cassandra.streaming.StreamOut;
/*
* The load balancing algorithm here is an implementation of
@@ -365,7 +365,7 @@
Thread.sleep(100);
}
// one more sleep in case there are some stragglers
- Thread.sleep(Streaming.RING_DELAY);
+ Thread.sleep(StreamOut.RING_DELAY);
}
catch (InterruptedException e)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Sun Jan 31 00:22:29 2010
@@ -316,10 +316,10 @@
isBootstrapMode = true;
SystemTable.updateToken(token); // DON'T use setToken, that makes us
part of the ring locally which is incorrect until we are done bootstrapping
Gossiper.instance.addApplicationState(MOVE_STATE, new
ApplicationState(STATE_BOOTSTRAPPING + Delimiter +
partitioner_.getTokenFactory().toString(token)));
- logger_.info("bootstrap sleeping " + Streaming.RING_DELAY);
+ logger_.info("bootstrap sleeping " + StreamOut.RING_DELAY);
try
{
- Thread.sleep(Streaming.RING_DELAY);
+ Thread.sleep(StreamOut.RING_DELAY);
}
catch (InterruptedException e)
{
@@ -708,7 +708,7 @@
// Finally we have a list of addresses and ranges to stream.
Proceed to stream
for (Map.Entry<InetAddress, Collection<Range>> entry :
sourceRanges.asMap().entrySet())
- Streaming.requestRanges(entry.getKey(), entry.getValue());
+ StreamIn.requestRanges(entry.getKey(), entry.getValue());
}
}
@@ -1265,8 +1265,8 @@
logger_.info("DECOMMISSIONING");
startLeaving();
- logger_.info("decommission sleeping " + Streaming.RING_DELAY);
- Thread.sleep(Streaming.RING_DELAY);
+ logger_.info("decommission sleeping " + StreamOut.RING_DELAY);
+ Thread.sleep(StreamOut.RING_DELAY);
Runnable finishLeaving = new Runnable()
{
@@ -1336,7 +1336,7 @@
public void run()
{
// TODO each call to transferRanges re-flushes, this is
potentially a lot of waste
- Streaming.transferRanges(newEndpoint,
Arrays.asList(range), callback);
+ StreamOut.transferRanges(newEndpoint,
Arrays.asList(range), callback);
}
});
}
@@ -1364,8 +1364,8 @@
logger_.info("starting move. leaving token " + getLocalToken());
startLeaving();
- logger_.info("move sleeping " + Streaming.RING_DELAY);
- Thread.sleep(Streaming.RING_DELAY);
+ logger_.info("move sleeping " + StreamOut.RING_DELAY);
+ Thread.sleep(StreamOut.RING_DELAY);
Runnable finishMoving = new WrappedRunnable()
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
Sun Jan 31 00:22:29 2010
@@ -29,13 +29,13 @@
switch (streamStatus.getAction())
{
case DELETE:
-
StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
+
StreamManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
break;
case STREAM:
if (logger.isDebugEnabled())
logger.debug("Need to re-stream file " +
streamStatus.getFile());
- StreamManager.instance(message.getFrom()).repeat();
+ StreamManager.get(message.getFrom()).startNext();
break;
default:
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=904925&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
Sun Jan 31 00:22:29 2010
@@ -0,0 +1,30 @@
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.Collection;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/** for streaming data from other nodes in to this one */
+public class StreamIn
+{
+ private static Logger logger = Logger.getLogger(StreamOut.class);
+
+ /**
+ * Request ranges to be transferred from source to local node
+ */
+ public static void requestRanges(InetAddress source, Collection<Range>
ranges)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Requesting from " + source + " ranges " +
StringUtils.join(ranges, ", "));
+ StreamRequestMetadata streamRequestMetadata = new
StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
+ Message message = StreamRequestMessage.makeStreamRequestMessage(new
StreamRequestMessage(streamRequestMetadata));
+ MessagingService.instance.sendOneWay(message, source);
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
Sun Jan 31 00:22:29 2010
@@ -14,6 +14,6 @@
{
if (logger.isDebugEnabled())
logger.debug("Received a stream initiate done message ...");
- StreamManager.instance(message.getFrom()).start();
+ StreamManager.get(message.getFrom()).startNext();
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
Sun Jan 31 00:22:29 2010
@@ -48,7 +48,7 @@
if (logger.isDebugEnabled())
logger.debug("no data needed from " + message.getFrom());
if (StorageService.instance.isBootstrapMode())
-
StorageService.instance.removeBootstrapSource(message.getFrom(), new
String(message.getHeader(Streaming.TABLE_NAME)));
+
StorageService.instance.removeBootstrapSource(message.getFrom(), new
String(message.getHeader(StreamOut.TABLE_NAME)));
return;
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
Sun Jan 31 00:22:29 2010
@@ -31,6 +31,7 @@
import org.apache.cassandra.streaming.StreamContextManager;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.SimpleCondition;
import org.apache.log4j.Logger;
@@ -39,88 +40,82 @@
*/
public class StreamManager
{
- private static Logger logger_ = Logger.getLogger( StreamManager.class );
+ private static Logger logger = Logger.getLogger( StreamManager.class );
- private static ConcurrentMap<InetAddress, StreamManager> streamManagers_ =
new ConcurrentHashMap<InetAddress, StreamManager>();
-
- public static StreamManager instance(InetAddress to)
+ private static ConcurrentMap<InetAddress, StreamManager> streamManagers =
new ConcurrentHashMap<InetAddress, StreamManager>();
+
+ public static StreamManager get(InetAddress to)
{
- StreamManager streamManager = streamManagers_.get(to);
- if ( streamManager == null )
+ StreamManager streamManager = streamManagers.get(to);
+ if (streamManager == null)
{
StreamManager possibleNew = new StreamManager(to);
- if ((streamManager = streamManagers_.putIfAbsent(to, possibleNew))
== null)
+ if ((streamManager = streamManagers.putIfAbsent(to, possibleNew))
== null)
streamManager = possibleNew;
}
return streamManager;
}
- private List<File> filesToStream_ = new ArrayList<File>();
- private InetAddress to_;
- private long totalBytesToStream_ = 0L;
+ private final List<File> files = new ArrayList<File>();
+ private final InetAddress to;
+ private long totalBytes = 0L;
+ private final SimpleCondition condition = new SimpleCondition();
private StreamManager(InetAddress to)
{
- to_ = to;
+ this.to = to;
}
public void addFilesToStream(StreamContextManager.StreamContext[]
streamContexts)
{
- for ( StreamContextManager.StreamContext streamContext :
streamContexts )
+ for (StreamContextManager.StreamContext streamContext : streamContexts)
{
- if (logger_.isDebugEnabled())
- logger_.debug("Adding file " + streamContext.getTargetFile() + "
to be streamed.");
- filesToStream_.add( new File( streamContext.getTargetFile() ) );
- totalBytesToStream_ += streamContext.getExpectedBytes();
+ if (logger.isDebugEnabled())
+ logger.debug("Adding file " + streamContext.getTargetFile() + "
to be streamed.");
+ files.add( new File( streamContext.getTargetFile() ) );
+ totalBytes += streamContext.getExpectedBytes();
}
}
- public void start()
+ public void startNext()
{
- if ( filesToStream_.size() > 0 )
+ if (files.size() > 0)
{
- File file = filesToStream_.get(0);
- if (logger_.isDebugEnabled())
- logger_.debug("Streaming " + file.length() + " length file " +
file + " ...");
- MessagingService.instance.stream(file.getAbsolutePath(), 0L,
file.length(), FBUtilities.getLocalAddress(), to_);
+ File file = files.get(0);
+ if (logger.isDebugEnabled())
+ logger.debug("Streaming " + file.length() + " length file " +
file + " ...");
+ MessagingService.instance.stream(file.getAbsolutePath(), 0L,
file.length(), FBUtilities.getLocalAddress(), to);
}
}
-
- public void repeat()
- {
- if ( filesToStream_.size() > 0 )
- start();
- }
-
- public void finish(String file) throws IOException
+
+ public void finishAndStartNext(String file) throws IOException
{
File f = new File(file);
- if (logger_.isDebugEnabled())
- logger_.debug("Deleting file " + file + " after streaming " +
f.length() + "/" + totalBytesToStream_ + " bytes.");
+ if (logger.isDebugEnabled())
+ logger.debug("Deleting file " + file + " after streaming " +
f.length() + "/" + totalBytes + " bytes.");
FileUtils.delete(file);
- filesToStream_.remove(0);
- if ( filesToStream_.size() > 0 )
- start();
+ files.remove(0);
+ if (files.size() > 0)
+ {
+ startNext();
+ }
else
{
- synchronized(this)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Signalling that streaming is done for " +
to_);
- notifyAll();
- }
+ if (logger.isDebugEnabled())
+ logger.debug("Signalling that streaming is done for " + to);
+ condition.signalAll();
}
}
- public synchronized void waitForStreamCompletion()
+ public void waitForStreamCompletion()
{
try
{
- wait();
+ condition.await();
}
- catch (InterruptedException ex)
+ catch (InterruptedException e)
{
- throw new AssertionError(ex);
+ throw new AssertionError(e);
}
}
}
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
(from r904924,
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java&r1=904924&r2=904925&rev=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/Streaming.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
Sun Jan 31 00:22:29 2010
@@ -42,7 +42,6 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamManager;
-import org.apache.cassandra.utils.FBUtilities;
/**
* This class handles streaming data from one node to another.
@@ -57,9 +56,10 @@
* For unbootstrap, the leaving node starts with step 3 (1 and 2 are skipped
entirely). This is why
* STREAM_INITIATE is a separate verb, rather than just a reply to
STREAM_REQUEST; the REQUEST is optional.
*/
-public class Streaming
+public class StreamOut
{
- private static Logger logger = Logger.getLogger(Streaming.class);
+ private static Logger logger = Logger.getLogger(StreamOut.class);
+
static String TABLE_NAME = "STREAMING-TABLE-NAME";
public static final long RING_DELAY = 30 * 1000; // delay after which we
assume ring has stablized
@@ -134,10 +134,10 @@
if (logger.isDebugEnabled())
logger.debug("Stream context metadata " +
StringUtils.join(streamContexts, ", "));
- StreamManager.instance(target).addFilesToStream(streamContexts);
+ StreamManager.get(target).addFilesToStream(streamContexts);
StreamInitiateMessage biMessage = new
StreamInitiateMessage(streamContexts);
Message message =
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
- message.addHeader(Streaming.TABLE_NAME, table.getBytes());
+ message.addHeader(StreamOut.TABLE_NAME, table.getBytes());
if (logger.isDebugEnabled())
logger.debug("Sending a stream initiate message to " + target + "
...");
MessagingService.instance.sendOneWay(message, target);
@@ -145,22 +145,10 @@
if (streamContexts.length > 0)
{
logger.info("Waiting for transfer to " + target + " to complete");
- StreamManager.instance(target).waitForStreamCompletion();
+ StreamManager.get(target).waitForStreamCompletion();
// (StreamManager will delete the streamed file on completion.)
logger.info("Done with transfer to " + target);
}
}
- /**
- * Request ranges to be transferred
- */
- public static void requestRanges(InetAddress source, Collection<Range>
ranges)
- {
- if (logger.isDebugEnabled())
- logger.debug("Requesting from " + source + " ranges " +
StringUtils.join(ranges, ", "));
- StreamRequestMetadata streamRequestMetadata = new
StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
- Message message = StreamRequestMessage.makeStreamRequestMessage(new
StreamRequestMessage(streamRequestMetadata));
- MessagingService.instance.sendOneWay(message, source);
- }
-
}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
Sun Jan 31 00:22:29 2010
@@ -52,7 +52,7 @@
{
if (logger_.isDebugEnabled())
logger_.debug(srm.toString());
- Streaming.transferRanges(srm.target_, srm.ranges_, null);
+ StreamOut.transferRanges(srm.target_, srm.ranges_, null);
}
}
catch (IOException ex)
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=904925&r1=904924&r2=904925&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
Sun Jan 31 00:22:29 2010
@@ -31,7 +31,7 @@
import org.apache.cassandra.io.SSTableUtils;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.Streaming;
+import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.Test;
@@ -54,7 +54,7 @@
String cfname = sstable.getColumnFamilyName();
// transfer
- Streaming.transferSSTables(LOCAL, Arrays.asList(sstable), tablename);
+ StreamOut.transferSSTables(LOCAL, Arrays.asList(sstable), tablename);
// confirm that the SSTable was transferred and registered
ColumnFamilyStore cfstore =
Table.open(tablename).getColumnFamilyStore(cfname);