Author: jbellis
Date: Sun Jan 31 00:29:37 2010
New Revision: 904932
URL: http://svn.apache.org/viewvc?rev=904932&view=rev
Log:
move IncomingStreamReader, StreamInitiateMessage, and BootstrapTest to
streaming package. r/m 'public' modifier from streaming classes that don't
need it
patch by jbellis; reviewed by stuhood for CASSANDRA-751
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(contents, props changed)
- copied, changed from r904930,
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
(contents, props changed)
- copied, changed from r904930,
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
(contents, props changed)
- copied, changed from r904930,
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
Sun Jan 31 00:29:37 2010
@@ -28,7 +28,6 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamOut;
import org.apache.log4j.Logger;
@@ -415,10 +414,10 @@
Hashtable<InetAddress, Long> copy = new Hashtable<InetAddress,
Long>(justRemovedEndPoints_);
for (Map.Entry<InetAddress, Long> entry : copy.entrySet())
{
- if ((now - entry.getValue()) > StreamOut.RING_DELAY)
+ if ((now - entry.getValue()) > StorageService.RING_DELAY)
{
if (logger_.isDebugEnabled())
- logger_.debug(StreamOut.RING_DELAY + " elapsed, "
+ entry.getKey() + " gossip quarantine over");
+ logger_.debug(StorageService.RING_DELAY + "
elapsed, " + entry.getKey() + " gossip quarantine over");
justRemovedEndPoints_.remove(entry.getKey());
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Sun Jan 31 00:29:37 2010
@@ -2,12 +2,10 @@
import java.io.*;
import java.net.Socket;
-import java.nio.ByteBuffer;
import org.apache.log4j.Logger;
-import org.apache.cassandra.net.io.IncomingStreamReader;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.streaming.IncomingStreamReader;
public class IncomingTcpConnection extends Thread
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Sun Jan 31 00:29:37 2010
@@ -35,7 +35,6 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.streaming.StreamOut;
/*
* The load balancing algorithm here is an implementation of
@@ -365,7 +364,7 @@
Thread.sleep(100);
}
// one more sleep in case there are some stragglers
- Thread.sleep(StreamOut.RING_DELAY);
+ Thread.sleep(StorageService.RING_DELAY);
}
catch (InterruptedException e)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Sun Jan 31 00:29:37 2010
@@ -62,6 +62,8 @@
{
private static Logger logger_ = Logger.getLogger(StorageService.class);
+ public static final long RING_DELAY = 30 * 1000; // delay after which we
assume ring has stablized
+
public final static String MOVE_STATE = "MOVE";
// this must be a char that cannot be present in any token
@@ -316,10 +318,10 @@
isBootstrapMode = true;
SystemTable.updateToken(token); // DON'T use setToken, that makes us
part of the ring locally which is incorrect until we are done bootstrapping
Gossiper.instance.addApplicationState(MOVE_STATE, new
ApplicationState(STATE_BOOTSTRAPPING + Delimiter +
partitioner_.getTokenFactory().toString(token)));
- logger_.info("bootstrap sleeping " + StreamOut.RING_DELAY);
+ logger_.info("bootstrap sleeping " + RING_DELAY);
try
{
- Thread.sleep(StreamOut.RING_DELAY);
+ Thread.sleep(RING_DELAY);
}
catch (InterruptedException e)
{
@@ -1265,8 +1267,8 @@
logger_.info("DECOMMISSIONING");
startLeaving();
- logger_.info("decommission sleeping " + StreamOut.RING_DELAY);
- Thread.sleep(StreamOut.RING_DELAY);
+ logger_.info("decommission sleeping " + RING_DELAY);
+ Thread.sleep(RING_DELAY);
Runnable finishLeaving = new Runnable()
{
@@ -1364,8 +1366,8 @@
logger_.info("starting move. leaving token " + getLocalToken());
startLeaving();
- logger_.info("move sleeping " + StreamOut.RING_DELAY);
- Thread.sleep(StreamOut.RING_DELAY);
+ logger_.info("move sleeping " + RING_DELAY);
+ Thread.sleep(RING_DELAY);
Runnable finishMoving = new WrappedRunnable()
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
Sun Jan 31 00:29:37 2010
@@ -10,7 +10,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public class CompletedFileStatus
+class CompletedFileStatus
{
private static ICompactSerializer<CompletedFileStatus> serializer_;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
Sun Jan 31 00:29:37 2010
@@ -22,7 +22,7 @@
import java.net.InetAddress;
-public interface IStreamComplete
+interface IStreamComplete
{
/*
* This callback if registered with the StreamContextManager is
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(from r904930,
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.net.io;
+package org.apache.cassandra.streaming;
import java.net.InetSocketAddress;
import java.net.InetAddress;
@@ -27,10 +27,6 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.net.FileStreamTask;
-import org.apache.cassandra.streaming.CompletedFileStatus;
-import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.InitiatedFile;
-import org.apache.cassandra.streaming.StreamInManager;
public class IncomingStreamReader
{
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
Sun Jan 31 00:29:37 2010
@@ -6,7 +6,7 @@
import org.apache.cassandra.io.ICompactSerializer;
-public class InitiatedFile
+class InitiatedFile
{
private static ICompactSerializer<InitiatedFile> serializer_;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
Sun Jan 31 00:29:37 2010
@@ -29,7 +29,7 @@
import org.apache.log4j.Logger;
-public class StreamInManager
+class StreamInManager
{
private static final Logger logger =
Logger.getLogger(StreamInManager.class);
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
(from r904930,
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.dht;
+package org.apache.cassandra.streaming;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -29,7 +29,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public class StreamInitiateMessage
+class StreamInitiateMessage
{
private static ICompactSerializer<StreamInitiateMessage> serializer_;
@@ -62,32 +62,31 @@
{
return streamContexts_;
}
-}
-class StreamInitiateMessageSerializer implements
ICompactSerializer<StreamInitiateMessage>
-{
- public void serialize(StreamInitiateMessage bim, DataOutputStream dos)
throws IOException
+ private static class StreamInitiateMessageSerializer implements
ICompactSerializer<StreamInitiateMessage>
{
- dos.writeInt(bim.streamContexts_.length);
- for ( InitiatedFile initiatedFile : bim.streamContexts_ )
+ public void serialize(StreamInitiateMessage bim, DataOutputStream dos)
throws IOException
{
- InitiatedFile.serializer().serialize(initiatedFile, dos);
+ dos.writeInt(bim.streamContexts_.length);
+ for ( InitiatedFile initiatedFile : bim.streamContexts_ )
+ {
+ InitiatedFile.serializer().serialize(initiatedFile, dos);
+ }
}
- }
-
- public StreamInitiateMessage deserialize(DataInputStream dis) throws
IOException
- {
- int size = dis.readInt();
- InitiatedFile[] initiatedFiles = new InitiatedFile[0];
- if ( size > 0 )
+
+ public StreamInitiateMessage deserialize(DataInputStream dis) throws
IOException
{
- initiatedFiles = new InitiatedFile[size];
- for ( int i = 0; i < size; ++i )
+ int size = dis.readInt();
+ InitiatedFile[] initiatedFiles = new InitiatedFile[0];
+ if ( size > 0 )
{
- initiatedFiles[i] =
InitiatedFile.serializer().deserialize(dis);
+ initiatedFiles = new InitiatedFile[size];
+ for ( int i = 0; i < size; ++i )
+ {
+ initiatedFiles[i] =
InitiatedFile.serializer().deserialize(dis);
+ }
}
+ return new StreamInitiateMessage(initiatedFiles);
}
- return new StreamInitiateMessage(initiatedFiles);
}
}
-
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
Sun Jan 31 00:29:37 2010
@@ -12,7 +12,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.streaming.StreamInitiateMessage;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
Sun Jan 31 00:29:37 2010
@@ -33,7 +33,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.streaming.StreamInitiateMessage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.SSTable;
@@ -60,7 +60,6 @@
private static Logger logger = Logger.getLogger(StreamOut.class);
static String TABLE_NAME = "STREAMING-TABLE-NAME";
- public static final long RING_DELAY = 30 * 1000; // delay after which we
assume ring has stablized
/**
* Split out files for all tables on disk locally for each range and then
stream them to the target endpoint.
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
Sun Jan 31 00:29:37 2010
@@ -37,7 +37,7 @@
/**
* This class manages the streaming of multiple files one after the other.
*/
-public class StreamOutManager
+class StreamOutManager
{
private static Logger logger = Logger.getLogger( StreamOutManager.class );
Copied:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
(from r904930,
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?p2=incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java&p1=incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.cassandra.dht;
+package org.apache.cassandra.streaming;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.*;
@@ -27,8 +27,6 @@
import java.util.Map;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.streaming.InitiatedFile;
-import org.apache.cassandra.streaming.StreamInitiateVerbHandler;
import org.junit.Test;
Propchange:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
------------------------------------------------------------------------------
svn:eol-style = native