Author: jbellis
Date: Sun Jan 31 00:29:37 2010
New Revision: 904932

URL: http://svn.apache.org/viewvc?rev=904932&view=rev
Log:
move IncomingStreamReader, StreamInitiateMessage, and BootstrapTest to 
streaming package.  r/m 'public' modifier from streaming classes that don't 
need it
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Added:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
   (contents, props changed)
      - copied, changed from r904930, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
   (contents, props changed)
      - copied, changed from r904930, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
   (contents, props changed)
      - copied, changed from r904930, 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Removed:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java 
Sun Jan 31 00:29:37 2010
@@ -28,7 +28,6 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamOut;
 
 import org.apache.log4j.Logger;
 
@@ -415,10 +414,10 @@
                 Hashtable<InetAddress, Long> copy = new Hashtable<InetAddress, 
Long>(justRemovedEndPoints_);
                 for (Map.Entry<InetAddress, Long> entry : copy.entrySet())
                 {
-                    if ((now - entry.getValue()) > StreamOut.RING_DELAY)
+                    if ((now - entry.getValue()) > StorageService.RING_DELAY)
                     {
                         if (logger_.isDebugEnabled())
-                            logger_.debug(StreamOut.RING_DELAY + " elapsed, " 
+ entry.getKey() + " gossip quarantine over");
+                            logger_.debug(StorageService.RING_DELAY + " 
elapsed, " + entry.getKey() + " gossip quarantine over");
                         justRemovedEndPoints_.remove(entry.getKey());
                     }
                 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
 Sun Jan 31 00:29:37 2010
@@ -2,12 +2,10 @@
 
 import java.io.*;
 import java.net.Socket;
-import java.nio.ByteBuffer;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.net.io.IncomingStreamReader;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.streaming.IncomingStreamReader;
 
 public class IncomingTcpConnection extends Thread
 {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
 Sun Jan 31 00:29:37 2010
@@ -35,7 +35,6 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.streaming.StreamOut;
 
 /*
  * The load balancing algorithm here is an implementation of
@@ -365,7 +364,7 @@
                 Thread.sleep(100);
             }
             // one more sleep in case there are some stragglers
-            Thread.sleep(StreamOut.RING_DELAY);
+            Thread.sleep(StorageService.RING_DELAY);
         }
         catch (InterruptedException e)
         {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 Sun Jan 31 00:29:37 2010
@@ -62,6 +62,8 @@
 {
     private static Logger logger_ = Logger.getLogger(StorageService.class);    
 
 
+    public static final long RING_DELAY = 30 * 1000; // delay after which we 
assume ring has stablized
+
     public final static String MOVE_STATE = "MOVE";
 
     // this must be a char that cannot be present in any token
@@ -316,10 +318,10 @@
         isBootstrapMode = true;
         SystemTable.updateToken(token); // DON'T use setToken, that makes us 
part of the ring locally which is incorrect until we are done bootstrapping
         Gossiper.instance.addApplicationState(MOVE_STATE, new 
ApplicationState(STATE_BOOTSTRAPPING + Delimiter + 
partitioner_.getTokenFactory().toString(token)));
-        logger_.info("bootstrap sleeping " + StreamOut.RING_DELAY);
+        logger_.info("bootstrap sleeping " + RING_DELAY);
         try
         {
-            Thread.sleep(StreamOut.RING_DELAY);
+            Thread.sleep(RING_DELAY);
         }
         catch (InterruptedException e)
         {
@@ -1265,8 +1267,8 @@
 
         logger_.info("DECOMMISSIONING");
         startLeaving();
-        logger_.info("decommission sleeping " + StreamOut.RING_DELAY);
-        Thread.sleep(StreamOut.RING_DELAY);
+        logger_.info("decommission sleeping " + RING_DELAY);
+        Thread.sleep(RING_DELAY);
 
         Runnable finishLeaving = new Runnable()
         {
@@ -1364,8 +1366,8 @@
 
         logger_.info("starting move. leaving token " + getLocalToken());
         startLeaving();
-        logger_.info("move sleeping " + StreamOut.RING_DELAY);
-        Thread.sleep(StreamOut.RING_DELAY);
+        logger_.info("move sleeping " + RING_DELAY);
+        Thread.sleep(RING_DELAY);
 
         Runnable finishMoving = new WrappedRunnable()
         {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
 Sun Jan 31 00:29:37 2010
@@ -10,7 +10,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class CompletedFileStatus
+class CompletedFileStatus
 {
     private static ICompactSerializer<CompletedFileStatus> serializer_;
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
 Sun Jan 31 00:29:37 2010
@@ -22,7 +22,7 @@
 
 import java.net.InetAddress;
 
-public interface IStreamComplete
+interface IStreamComplete
 {
     /*
      * This callback if registered with the StreamContextManager is 

Copied: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (from r904930, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java)
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.net.io;
+package org.apache.cassandra.streaming;
 
 import java.net.InetSocketAddress;
 import java.net.InetAddress;
@@ -27,10 +27,6 @@
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.net.FileStreamTask;
-import org.apache.cassandra.streaming.CompletedFileStatus;
-import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.InitiatedFile;
-import org.apache.cassandra.streaming.StreamInManager;
 
 public class IncomingStreamReader
 {

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
 Sun Jan 31 00:29:37 2010
@@ -6,7 +6,7 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 
-public class InitiatedFile
+class InitiatedFile
 {
     private static ICompactSerializer<InitiatedFile> serializer_;
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
 Sun Jan 31 00:29:37 2010
@@ -29,7 +29,7 @@
 
 import org.apache.log4j.Logger;
 
-public class StreamInManager
+class StreamInManager
 {
     private static final Logger logger = 
Logger.getLogger(StreamInManager.class);
 

Copied: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
 (from r904930, 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java)
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
 Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.dht;
+package org.apache.cassandra.streaming;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -29,7 +29,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class StreamInitiateMessage
+class StreamInitiateMessage
 {
     private static ICompactSerializer<StreamInitiateMessage> serializer_;
 
@@ -62,32 +62,31 @@
     {
         return streamContexts_;
     }
-}
 
-class StreamInitiateMessageSerializer implements 
ICompactSerializer<StreamInitiateMessage>
-{
-    public void serialize(StreamInitiateMessage bim, DataOutputStream dos) 
throws IOException
+    private static class StreamInitiateMessageSerializer implements 
ICompactSerializer<StreamInitiateMessage>
     {
-        dos.writeInt(bim.streamContexts_.length);
-        for ( InitiatedFile initiatedFile : bim.streamContexts_ )
+        public void serialize(StreamInitiateMessage bim, DataOutputStream dos) 
throws IOException
         {
-            InitiatedFile.serializer().serialize(initiatedFile, dos);
+            dos.writeInt(bim.streamContexts_.length);
+            for ( InitiatedFile initiatedFile : bim.streamContexts_ )
+            {
+                InitiatedFile.serializer().serialize(initiatedFile, dos);
+            }
         }
-    }
-    
-    public StreamInitiateMessage deserialize(DataInputStream dis) throws 
IOException
-    {
-        int size = dis.readInt();
-        InitiatedFile[] initiatedFiles = new InitiatedFile[0];
-        if ( size > 0 )
+
+        public StreamInitiateMessage deserialize(DataInputStream dis) throws 
IOException
         {
-            initiatedFiles = new InitiatedFile[size];
-            for ( int i = 0; i < size; ++i )
+            int size = dis.readInt();
+            InitiatedFile[] initiatedFiles = new InitiatedFile[0];
+            if ( size > 0 )
             {
-                initiatedFiles[i] = 
InitiatedFile.serializer().deserialize(dis);
+                initiatedFiles = new InitiatedFile[size];
+                for ( int i = 0; i < size; ++i )
+                {
+                    initiatedFiles[i] = 
InitiatedFile.serializer().deserialize(dis);
+                }
             }
+            return new StreamInitiateMessage(initiatedFiles);
         }
-        return new StreamInitiateMessage(initiatedFiles);
     }
 }
-

Propchange: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 Sun Jan 31 00:29:37 2010
@@ -12,7 +12,7 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.streaming.StreamInitiateMessage;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
 Sun Jan 31 00:29:37 2010
@@ -33,7 +33,7 @@
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.streaming.StreamInitiateMessage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.SSTable;
@@ -60,7 +60,6 @@
     private static Logger logger = Logger.getLogger(StreamOut.class);
 
     static String TABLE_NAME = "STREAMING-TABLE-NAME";
-    public static final long RING_DELAY = 30 * 1000; // delay after which we 
assume ring has stablized
 
     /**
      * Split out files for all tables on disk locally for each range and then 
stream them to the target endpoint.

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
 Sun Jan 31 00:29:37 2010
@@ -37,7 +37,7 @@
 /**
  * This class manages the streaming of multiple files one after the other.
 */
-public class StreamOutManager
+class StreamOutManager
 {   
     private static Logger logger = Logger.getLogger( StreamOutManager.class );
         

Copied: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
 (from r904930, 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java)
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?p2=incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java&p1=incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java 
(original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
 Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
 * specific language governing permissions and limitations
 * under the License.
 */
-package org.apache.cassandra.dht;
+package org.apache.cassandra.streaming;
 
 import static junit.framework.Assert.assertEquals;
 import static org.junit.Assert.*;
@@ -27,8 +27,6 @@
 import java.util.Map;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.streaming.InitiatedFile;
-import org.apache.cassandra.streaming.StreamInitiateVerbHandler;
 
 import org.junit.Test;
 

Propchange: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to