Author: gdusbabek
Date: Wed May 26 18:56:37 2010
New Revision: 948536

URL: http://svn.apache.org/viewvc?rev=948536&view=rev
Log:
rename StreamCompletionHandler to FileStatusHandler. patch by stuhood, reviwed 
by gdusbabek. CASSANDRA-1019

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
      - copied, changed from r948535, 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java

Copied: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java 
(from r948535, 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java)
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java&r1=948535&r2=948536&rev=948536&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java 
Wed May 26 18:56:37 2010
@@ -30,35 +30,42 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.StreamInManager;
 import org.apache.cassandra.service.StorageService;
 
 /**
- * This is the callback handler that is invoked when we have
- * completely received a single file from a remote host.
- *
- * TODO if we move this into CFS we could make addSSTables private, improving 
encapsulation.
+ * This is the callback handler that is invoked on the receiving node when a 
file changes status from RECEIVE to either
+ * FileStatus.STREAM (needs to be restreamed) or FileStatus.DELETE 
(successfully completed).
 */
-class StreamCompletionHandler implements IStreamComplete
+class FileStatusHandler
 {
-    private static Logger logger = 
LoggerFactory.getLogger(StreamCompletionHandler.class);
+    private static Logger logger = 
LoggerFactory.getLogger(FileStatusHandler.class);
 
-    public void onStreamCompletion(InetAddress host, PendingFile pendingFile, 
FileStatus streamStatus) throws IOException
+    public void onStatusChange(InetAddress host, PendingFile pendingFile, 
FileStatus streamStatus) throws IOException
     {
-        /* Parse the stream context and the file to the list of SSTables in 
the associated Column Family Store. */
-        if (pendingFile.getFilename().contains("-Data.db"))
+        if (FileStatus.StreamCompletionAction.STREAM == 
streamStatus.getAction())
         {
+            // file needs to be restreamed
+            logger.warn("Streaming of file " + pendingFile + " from " + host + 
" failed, but will be retried.");
+            // request that the source node re-stream the file
+            
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
host);
+            return;
+        }
+        assert FileStatus.StreamCompletionAction.DELETE == 
streamStatus.getAction() :
+            "Unknown stream status: " + streamStatus.getAction();
+
+        // file was successfully streamed: if it was the last component of an 
sstable, assume that the rest
+        // have already arrived
+        if (pendingFile.getFilename().endsWith("-Data.db"))
+        {
+            // last component triggers add: see TODO in 
SSTable.getAllComponents()
             String tableName = pendingFile.getDescriptor().ksname;
-            File file = new File( pendingFile.getFilename() );
+            File file = new File(pendingFile.getFilename());
             String fileName = file.getName();
             String [] temp = fileName.split("-");
 
-            //Open the file to see if all parts are now here
             try
             {
                 SSTableReader sstable = 
SSTableWriter.renameAndOpen(pendingFile.getFilename());
-                //TODO add a sanity check that this sstable has all its parts 
and is ok
                 
Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
                 logger.info("Streaming added " + sstable.getFilename());
             }
@@ -68,12 +75,12 @@ class StreamCompletionHandler implements
             }
         }
 
+        // send a StreamStatus message telling the source node it can delete 
this file
         if (logger.isDebugEnabled())
-          logger.debug("Sending a streaming finished message with " + 
streamStatus + " to " + host);
-        /* Send a StreamStatus message which may require the source node to 
re-stream certain files. */
+            logger.debug("Sending a streaming finished message for " + 
pendingFile + " to " + host);
         
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
host);
 
-        /* If we're done with everything for this host, remove from bootstrap 
sources */
+        // if all files have been received from this host, remove from 
bootstrap sources
         if (StreamInManager.isDone(host) && 
StorageService.instance.isBootstrapMode())
         {
             StorageService.instance.removeBootstrapSource(host, 
pendingFile.getDescriptor().ksname);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=948536&r1=948535&r2=948536&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Wed May 26 18:56:37 2010
@@ -69,7 +69,7 @@ public class IncomingStreamReader
         {
             /* Ask the source node to re-stream this file. */
             streamStatus.setAction(FileStatus.StreamCompletionAction.STREAM);
-            handleStreamCompletion(remoteAddress.getAddress());
+            handleFileStatus(remoteAddress.getAddress());
             /* Delete the orphaned file. */
             File file = new File(pendingFile.getFilename());
             file.delete();
@@ -88,18 +88,18 @@ public class IncomingStreamReader
                 logger.debug("Removing stream context " + pendingFile);
             }
             fc.close();
-            handleStreamCompletion(remoteAddress.getAddress());
+            handleFileStatus(remoteAddress.getAddress());
         }
     }
 
-    private void handleStreamCompletion(InetAddress remoteHost) throws 
IOException
+    private void handleFileStatus(InetAddress remoteHost) throws IOException
     {
         /*
          * Streaming is complete. If all the data that has to be received 
inform the sender via
          * the stream completion callback so that the source may perform the 
requisite cleanup.
         */
-        IStreamComplete streamComplete = 
StreamInManager.getStreamCompletionHandler(remoteHost);
-        if (streamComplete != null)
-            streamComplete.onStreamCompletion(remoteHost, pendingFile, 
streamStatus);
+        FileStatusHandler handler = 
StreamInManager.getFileStatusHandler(remoteHost);
+        if (handler != null)
+            handler.onStatusChange(remoteHost, pendingFile, streamStatus);
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=948536&r1=948535&r2=948536&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java 
Wed May 26 18:56:37 2010
@@ -24,7 +24,7 @@ import java.net.InetAddress;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
-import org.apache.cassandra.streaming.IStreamComplete;
+import org.apache.cassandra.streaming.FileStatusHandler;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +38,7 @@ class StreamInManager
     /* Maintain in this map the status of the streams that need to be sent 
back to the source */
     public static final Map<InetAddress, List<FileStatus>> streamStatusBag_ = 
new Hashtable<InetAddress, List<FileStatus>>();
     /* Maintains a callback handler per endpoint to notify the app that a 
stream from a given endpoint has been handled */
-    public static final Map<InetAddress, IStreamComplete> 
streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
+    public static final Map<InetAddress, FileStatusHandler> 
streamNotificationHandlers_ = new HashMap<InetAddress, FileStatusHandler>();
 
     public static final Multimap<InetAddress, PendingFile> activeStreams = 
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, PendingFile>create());
 
@@ -85,7 +85,7 @@ class StreamInManager
     }
 
     /*
-     * This method helps determine if the StreamCompletionHandler needs
+     * This method helps determine if the FileStatusHandler needs
      * to be invoked for the data being streamed from a source. 
     */
     public synchronized static boolean isDone(InetAddress key)
@@ -93,17 +93,17 @@ class StreamInManager
         return (ctxBag_.get(key) == null);
     }
     
-    public synchronized static IStreamComplete 
getStreamCompletionHandler(InetAddress key)
+    public synchronized static FileStatusHandler 
getFileStatusHandler(InetAddress key)
     {
         return streamNotificationHandlers_.get(key);
     }
     
-    public synchronized static void removeStreamCompletionHandler(InetAddress 
key)
+    public synchronized static void removeFileStatusHandler(InetAddress key)
     {
         streamNotificationHandlers_.remove(key);
     }
     
-    public synchronized static void 
registerStreamCompletionHandler(InetAddress key, IStreamComplete streamComplete)
+    public synchronized static void registerFileStatusHandler(InetAddress key, 
FileStatusHandler streamComplete)
     {
         streamNotificationHandlers_.put(key, streamComplete);
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=948536&r1=948535&r2=948536&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 Wed May 26 18:56:37 2010
@@ -86,7 +86,7 @@ public class StreamInitiateVerbHandler i
                 addStreamContext(message.getFrom(), localFile, streamStatus);
             }
 
-            StreamInManager.registerStreamCompletionHandler(message.getFrom(), 
new StreamCompletionHandler());
+            StreamInManager.registerFileStatusHandler(message.getFrom(), new 
FileStatusHandler());
             if (logger.isDebugEnabled())
               logger.debug("Sending a stream initiate done message ...");
             Message doneMessage = new Message(FBUtilities.getLocalAddress(), 
"", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] );


Reply via email to