Author: gdusbabek
Date: Wed May 26 18:56:45 2010
New Revision: 948537

URL: http://svn.apache.org/viewvc?rev=948537&view=rev
Log:
rename StreamCompletionAction to Action. patch by stuhood, reviwed by 
gdusbabek. CASSANDRA-1019

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java?rev=948537&r1=948536&r2=948537&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java Wed 
May 26 18:56:45 2010
@@ -35,9 +35,11 @@ class FileStatus
 {
     private static ICompactSerializer<FileStatus> serializer_;
 
-    public static enum StreamCompletionAction
+    static enum Action
     {
+        // was received successfully, and can be deleted from the source node
         DELETE,
+        // needs to be streamed (or restreamed)
         STREAM
     }
 
@@ -51,15 +53,18 @@ class FileStatus
         return serializer_;
     }
 
-    private String file_;
-    private long expectedBytes_;
-    private StreamCompletionAction action_;
-
+    private final String file_;
+    private final long expectedBytes_;
+    private Action action_;
+
+    /**
+     * Create a FileStatus with the default Action: STREAM.
+     */
     public FileStatus(String file, long expectedBytes)
     {
         file_ = file;
         expectedBytes_ = expectedBytes;
-        action_ = StreamCompletionAction.DELETE;
+        action_ = Action.STREAM;
     }
 
     public String getFile()
@@ -72,12 +77,12 @@ class FileStatus
         return expectedBytes_;
     }
 
-    public void setAction(StreamCompletionAction action)
+    public void setAction(Action action)
     {
         action_ = action;
     }
 
-    public StreamCompletionAction getAction()
+    public Action getAction()
     {
         return action_;
     }
@@ -106,14 +111,12 @@ class FileStatus
             FileStatus streamStatus = new FileStatus(targetFile, 
expectedBytes);
 
             int ordinal = dis.readInt();
-            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.DELETE);
-            }
-            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.STREAM);
-            }
+            if (ordinal == Action.DELETE.ordinal())
+                streamStatus.setAction(Action.DELETE);
+            else if (ordinal == Action.STREAM.ordinal())
+                streamStatus.setAction(Action.STREAM);
+            else
+                throw new IOException("Bad FileStatus.Action: " + ordinal);
 
             return streamStatus;
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=948537&r1=948536&r2=948537&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java 
Wed May 26 18:56:45 2010
@@ -42,16 +42,15 @@ class FileStatusHandler
 
     public void onStatusChange(InetAddress host, PendingFile pendingFile, 
FileStatus streamStatus) throws IOException
     {
-        if (FileStatus.StreamCompletionAction.STREAM == 
streamStatus.getAction())
+        if (FileStatus.Action.STREAM == streamStatus.getAction())
         {
             // file needs to be restreamed
-            logger.warn("Streaming of file " + pendingFile + " from " + host + 
" failed, but will be retried.");
-            // request that the source node re-stream the file
+            logger.warn("Streaming of file " + pendingFile + " from " + host + 
" failed: requesting a retry.");
             
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), 
host);
             return;
         }
-        assert FileStatus.StreamCompletionAction.DELETE == 
streamStatus.getAction() :
-            "Unknown stream status: " + streamStatus.getAction();
+        assert FileStatus.Action.DELETE == streamStatus.getAction() :
+            "Unknown stream action: " + streamStatus.getAction();
 
         // file was successfully streamed: if it was the last component of an 
sstable, assume that the rest
         // have already arrived

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=948537&r1=948536&r2=948537&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Wed May 26 18:56:45 2010
@@ -68,7 +68,7 @@ public class IncomingStreamReader
         catch (IOException ex)
         {
             /* Ask the source node to re-stream this file. */
-            streamStatus.setAction(FileStatus.StreamCompletionAction.STREAM);
+            streamStatus.setAction(FileStatus.Action.STREAM);
             handleFileStatus(remoteAddress.getAddress());
             /* Delete the orphaned file. */
             File file = new File(pendingFile.getFilename());
@@ -84,10 +84,9 @@ public class IncomingStreamReader
         if (bytesRead == pendingFile.getExpectedBytes())
         {
             if (logger.isDebugEnabled())
-            {
                 logger.debug("Removing stream context " + pendingFile);
-            }
             fc.close();
+            streamStatus.setAction(FileStatus.Action.DELETE);
             handleFileStatus(remoteAddress.getAddress());
         }
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=948537&r1=948536&r2=948537&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
 Wed May 26 18:56:45 2010
@@ -53,13 +53,12 @@ public class StreamFinishedVerbHandler i
                     break;
 
                 case STREAM:
-                    if (logger.isDebugEnabled())
-                        logger.debug("Need to re-stream file " + 
streamStatus.getFile());
+                    logger.warn("Need to re-stream file " + 
streamStatus.getFile() + " to " + message.getFrom());
                     StreamOutManager.get(message.getFrom()).startNext();
                     break;
 
                 default:
-                    break;
+                    throw new RuntimeException("Cannot handle 
FileStatus.Action: " + streamStatus.getAction());
             }
         }
         catch (IOException ex)


Reply via email to