Author: jbellis
Date: Tue Sep 14 22:26:13 2010
New Revision: 997122

URL: http://svn.apache.org/viewvc?rev=997122&view=rev
Log:
move FileStatusHandler methods to IncomingStreamReader
patch by jbellis; reviewed by Nick Bailey for CASSANDRA-1504


Removed:
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=997122&r1=997121&r2=997122&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Tue Sep 14 22:26:13 2010
@@ -22,12 +22,16 @@ import java.net.InetSocketAddress;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 import java.io.*;
-import java.util.Arrays;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
 
 public class IncomingStreamReader
@@ -109,7 +113,7 @@ public class IncomingStreamReader
             handleFileStatus(FileStatus.Action.DELETE);
         else
         {
-            FileStatusHandler.addSSTable(pendingFile);
+            addSSTable(pendingFile);
             session.finishAndRequestNext(lastFile);
         }
     }
@@ -117,6 +121,38 @@ public class IncomingStreamReader
     private void handleFileStatus(FileStatus.Action action) throws IOException
     {
         streamStatus.setAction(action);
-        FileStatusHandler.onStatusChange(session, pendingFile, streamStatus);
+        
+        if (FileStatus.Action.STREAM == streamStatus.getAction())
+        {
+            // file needs to be restreamed
+            logger.warn("Streaming of file {} from {} failed: requesting a 
retry.", pendingFile, session);
+            
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
session.getHost());
+            return;
+        }
+        assert FileStatus.Action.DELETE == streamStatus.getAction() : "Unknown 
stream action: " + streamStatus.getAction();
+
+        addSSTable(pendingFile);
+
+        // send a StreamStatus message telling the source node it can delete 
this file
+        if (logger.isDebugEnabled())
+            logger.debug("Sending a streaming finished message for {} to {}", 
pendingFile, session);
+        
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
session.getHost());
+    }
+
+    public static void addSSTable(PendingFile pendingFile)
+    {
+        // file was successfully streamed
+        Descriptor desc = pendingFile.desc;
+        try
+        {
+            SSTableReader sstable = 
SSTableWriter.recoverAndOpen(pendingFile.desc);
+            
Table.open(desc.ksname).getColumnFamilyStore(desc.cfname).addSSTable(sstable);
+            logger.info("Streaming added " + sstable);
+        }
+        catch (IOException e)
+        {
+            logger.error("Failed adding {}", pendingFile, e);
+            throw new RuntimeException("Not able to add streamed file " + 
pendingFile.getFilename(), e);
+        }
     }
 }


Reply via email to