Author: jbellis
Date: Sun Jan 31 00:23:42 2010
New Revision: 904929

URL: http://svn.apache.org/viewvc?rev=904929&view=rev
Log:
r/m unneeded StreamStatusMessage wrapper
patch by jbellis; reviewed by stuhood for CASSANDRA-751

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
 Sun Jan 31 00:23:42 2010
@@ -51,10 +51,8 @@
 
         if (logger.isDebugEnabled())
           logger.debug("Sending a streaming finished message with " + 
streamStatus + " to " + host);
-        /* Send a StreamStatusMessage object which may require the source node 
to re-stream certain files. */
-        StreamInManager.StreamStatusMessage streamStatusMessage = new 
StreamInManager.StreamStatusMessage(streamStatus);
-        Message message = 
StreamInManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
-        MessagingService.instance.sendOneWay(message, host);
+        /* Send a StreamStatus message which may require the source node to 
re-stream certain files. */
+        
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
host);
 
         /* If we're done with everything for this host, remove from bootstrap 
sources */
         if (StreamInManager.isDone(host) && 
StorageService.instance.isBootstrapMode())

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
 Sun Jan 31 00:23:42 2010
@@ -23,8 +23,7 @@
 
         try
         {
-            StreamInManager.StreamStatusMessage streamStatusMessage = 
StreamInManager.StreamStatusMessage.serializer().deserialize(new 
DataInputStream(bufIn));
-            StreamInManager.StreamStatus streamStatus = 
streamStatusMessage.getStreamStatus();
+            StreamInManager.StreamStatus streamStatus = 
StreamInManager.StreamStatus.serializer().deserialize(new 
DataInputStream(bufIn));
 
             switch (streamStatus.getAction())
             {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
 Sun Jan 31 00:23:42 2010
@@ -87,6 +87,14 @@
         {
             return action_;
         }
+
+        public Message makeStreamStatusMessage() throws IOException
+        {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream( bos );
+            StreamStatus.serializer().serialize(this, dos);
+            return new Message(FBUtilities.getLocalAddress(), "", 
StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
+        }
     }
     
     public static class StreamStatusSerializer implements 
ICompactSerializer<StreamStatus>
@@ -117,56 +125,7 @@
             return streamStatus;
         }
     }
-    
-    public static class StreamStatusMessage
-    {
-        private static ICompactSerializer<StreamStatusMessage> serializer_;
-        
-        static 
-        {
-            serializer_ = new StreamStatusMessageSerializer();
-        }
-        
-        public static ICompactSerializer<StreamStatusMessage> serializer()
-        {
-            return serializer_;
-        }
-        
-        public static Message makeStreamStatusMessage(StreamStatusMessage 
streamStatusMessage) throws IOException
-        {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            DataOutputStream dos = new DataOutputStream( bos );
-            StreamStatusMessage.serializer().serialize(streamStatusMessage, 
dos);
-            return new Message(FBUtilities.getLocalAddress(), "", 
StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
-        }
-        
-        protected StreamInManager.StreamStatus streamStatus_;
-        
-        public StreamStatusMessage(StreamInManager.StreamStatus streamStatus)
-        {
-            streamStatus_ = streamStatus;
-        }
-        
-        public StreamInManager.StreamStatus getStreamStatus()
-        {
-            return streamStatus_;
-        }
-    }
-    
-    public static class StreamStatusMessageSerializer implements 
ICompactSerializer<StreamStatusMessage>
-    {
-        public void serialize(StreamStatusMessage streamStatusMessage, 
DataOutputStream dos) throws IOException
-        {
-            
StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);    
        
-        }
-        
-        public StreamStatusMessage deserialize(DataInputStream dis) throws 
IOException
-        {            
-            StreamInManager.StreamStatus streamStatus = 
StreamStatus.serializer().deserialize(dis);
-            return new StreamStatusMessage(streamStatus);
-        }
-    }
-        
+                
     /* Maintain a stream context per host that is the source of the stream */
     public static final Map<InetAddress, List<InitiatedFile>> ctxBag_ = new 
Hashtable<InetAddress, List<InitiatedFile>>();
     /* Maintain in this map the status of the streams that need to be sent 
back to the source */


Reply via email to