Author: gdusbabek
Date: Wed May 26 18:56:45 2010
New Revision: 948537
URL: http://svn.apache.org/viewvc?rev=948537&view=rev
Log:
rename StreamCompletionAction to Action. patch by stuhood, reviwed by
gdusbabek. CASSANDRA-1019
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java?rev=948537&r1=948536&r2=948537&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java Wed
May 26 18:56:45 2010
@@ -35,9 +35,11 @@ class FileStatus
{
private static ICompactSerializer<FileStatus> serializer_;
- public static enum StreamCompletionAction
+ static enum Action
{
+ // was received successfully, and can be deleted from the source node
DELETE,
+ // needs to be streamed (or restreamed)
STREAM
}
@@ -51,15 +53,18 @@ class FileStatus
return serializer_;
}
- private String file_;
- private long expectedBytes_;
- private StreamCompletionAction action_;
-
+ private final String file_;
+ private final long expectedBytes_;
+ private Action action_;
+
+ /**
+ * Create a FileStatus with the default Action: STREAM.
+ */
public FileStatus(String file, long expectedBytes)
{
file_ = file;
expectedBytes_ = expectedBytes;
- action_ = StreamCompletionAction.DELETE;
+ action_ = Action.STREAM;
}
public String getFile()
@@ -72,12 +77,12 @@ class FileStatus
return expectedBytes_;
}
- public void setAction(StreamCompletionAction action)
+ public void setAction(Action action)
{
action_ = action;
}
- public StreamCompletionAction getAction()
+ public Action getAction()
{
return action_;
}
@@ -106,14 +111,12 @@ class FileStatus
FileStatus streamStatus = new FileStatus(targetFile,
expectedBytes);
int ordinal = dis.readInt();
- if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
- {
- streamStatus.setAction(StreamCompletionAction.DELETE);
- }
- else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
- {
- streamStatus.setAction(StreamCompletionAction.STREAM);
- }
+ if (ordinal == Action.DELETE.ordinal())
+ streamStatus.setAction(Action.DELETE);
+ else if (ordinal == Action.STREAM.ordinal())
+ streamStatus.setAction(Action.STREAM);
+ else
+ throw new IOException("Bad FileStatus.Action: " + ordinal);
return streamStatus;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=948537&r1=948536&r2=948537&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
Wed May 26 18:56:45 2010
@@ -42,16 +42,15 @@ class FileStatusHandler
public void onStatusChange(InetAddress host, PendingFile pendingFile,
FileStatus streamStatus) throws IOException
{
- if (FileStatus.StreamCompletionAction.STREAM ==
streamStatus.getAction())
+ if (FileStatus.Action.STREAM == streamStatus.getAction())
{
// file needs to be restreamed
- logger.warn("Streaming of file " + pendingFile + " from " + host +
" failed, but will be retried.");
- // request that the source node re-stream the file
+ logger.warn("Streaming of file " + pendingFile + " from " + host +
" failed: requesting a retry.");
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
host);
return;
}
- assert FileStatus.StreamCompletionAction.DELETE ==
streamStatus.getAction() :
- "Unknown stream status: " + streamStatus.getAction();
+ assert FileStatus.Action.DELETE == streamStatus.getAction() :
+ "Unknown stream action: " + streamStatus.getAction();
// file was successfully streamed: if it was the last component of an
sstable, assume that the rest
// have already arrived
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=948537&r1=948536&r2=948537&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Wed May 26 18:56:45 2010
@@ -68,7 +68,7 @@ public class IncomingStreamReader
catch (IOException ex)
{
/* Ask the source node to re-stream this file. */
- streamStatus.setAction(FileStatus.StreamCompletionAction.STREAM);
+ streamStatus.setAction(FileStatus.Action.STREAM);
handleFileStatus(remoteAddress.getAddress());
/* Delete the orphaned file. */
File file = new File(pendingFile.getFilename());
@@ -84,10 +84,9 @@ public class IncomingStreamReader
if (bytesRead == pendingFile.getExpectedBytes())
{
if (logger.isDebugEnabled())
- {
logger.debug("Removing stream context " + pendingFile);
- }
fc.close();
+ streamStatus.setAction(FileStatus.Action.DELETE);
handleFileStatus(remoteAddress.getAddress());
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=948537&r1=948536&r2=948537&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
Wed May 26 18:56:45 2010
@@ -53,13 +53,12 @@ public class StreamFinishedVerbHandler i
break;
case STREAM:
- if (logger.isDebugEnabled())
- logger.debug("Need to re-stream file " +
streamStatus.getFile());
+ logger.warn("Need to re-stream file " +
streamStatus.getFile() + " to " + message.getFrom());
StreamOutManager.get(message.getFrom()).startNext();
break;
default:
- break;
+ throw new RuntimeException("Cannot handle
FileStatus.Action: " + streamStatus.getAction());
}
}
catch (IOException ex)