Author: jbellis
Date: Wed Sep 22 03:40:59 2010
New Revision: 999737
URL: http://svn.apache.org/viewvc?rev=999737&view=rev
Log:
clean up FileStatus and rename to StreamReply. updates comments in StreamOut
and StreamIn
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415
Added:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
- copied, changed from r999733,
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
- copied, changed from r999733,
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=999737&r1=999736&r2=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Sep 22 03:40:59 2010
@@ -28,26 +28,18 @@ import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.RawColumnDefinition;
-import org.apache.cassandra.config.RawColumnFamily;
-import org.apache.cassandra.config.RawKeyspace;
-import org.apache.cassandra.utils.SkipNullRepresenter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.migration.AddKeyspace;
import org.apache.cassandra.db.migration.Migration;
@@ -68,8 +60,8 @@ import org.apache.cassandra.streaming.*;
import org.apache.cassandra.thrift.Constants;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SkipNullRepresenter;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.log4j.Level;
import org.yaml.snakeyaml.Dumper;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
@@ -97,7 +89,7 @@ public class StorageService implements I
READ_RESPONSE,
STREAM_INITIATE, // Deprecated
STREAM_INITIATE_DONE, // Deprecated
- STREAM_STATUS,
+ STREAM_REPLY,
STREAM_REQUEST,
RANGE_SLICE,
BOOTSTRAP_TOKEN,
@@ -122,7 +114,7 @@ public class StorageService implements I
put(Verb.READ_REPAIR, Stage.MUTATION);
put(Verb.READ, Stage.READ);
put(Verb.READ_RESPONSE, Stage.RESPONSE);
- put(Verb.STREAM_STATUS, Stage.MISC); // TODO does this really belong
on misc? I've just copied old behavior here
+ put(Verb.STREAM_REPLY, Stage.MISC); // TODO does this really belong on
misc? I've just copied old behavior here
put(Verb.STREAM_REQUEST, Stage.STREAM);
put(Verb.RANGE_SLICE, Stage.READ);
put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
@@ -222,7 +214,7 @@ public class StorageService implements I
// see BootStrapper for a summary of how the bootstrap verbs interact
MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN,
new BootStrapper.BootstrapTokenVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST,
new StreamRequestVerbHandler() );
- MessagingService.instance.registerVerbHandlers(Verb.STREAM_STATUS, new
StreamStatusVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.STREAM_REPLY, new
StreamReplyVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new
ResponseVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new
TreeRequestVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new
AntiEntropyService.TreeResponseVerbHandler());
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=999737&r1=999736&r2=999737&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Wed Sep 22 03:40:59 2010
@@ -88,9 +88,10 @@ public class IncomingStreamReader
}
catch (IOException ex)
{
- logger.debug("Receiving stream: recovering from IO error");
/* Ask the source node to re-stream this file. */
- handleFileStatus(FileStatus.Action.RETRY);
+ StreamReply reply = new StreamReply(remoteFile.getFilename(),
session.getSessionId(), StreamReply.Status.FILE_RETRY);
+ logger.info("Streaming of file {} from {} failed: requesting a
retry.", remoteFile, session);
+ MessagingService.instance.sendOneWay(reply.createMessage(),
session.getHost());
/* Delete the orphaned file. */
FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
@@ -103,29 +104,14 @@ public class IncomingStreamReader
if (logger.isDebugEnabled())
logger.debug("Removing stream context {}", remoteFile);
- handleFileStatus(FileStatus.Action.FINISHED);
- }
-
- private void handleFileStatus(FileStatus.Action action) throws IOException
- {
- FileStatus status = new FileStatus(remoteFile.getFilename(),
session.getSessionId(), action);
-
- if (FileStatus.Action.RETRY == action)
- {
- // file needs to be restreamed
- logger.warn("Streaming of file {} from {} failed: requesting a
retry.", remoteFile, session);
-
MessagingService.instance.sendOneWay(status.makeStreamStatusMessage(),
session.getHost());
- return;
- }
-
- assert FileStatus.Action.FINISHED == action : "Unknown stream action:
" + action;
+ StreamReply reply = new StreamReply(remoteFile.getFilename(),
session.getSessionId(), StreamReply.Status.FILE_FINISHED);
addSSTable(localFile);
session.remove(remoteFile);
// send a StreamStatus message telling the source node it can delete
this file
if (logger.isDebugEnabled())
logger.debug("Sending a streaming finished message for {} to {}",
remoteFile, session);
- MessagingService.instance.sendOneWay(status.makeStreamStatusMessage(),
session.getHost());
+ MessagingService.instance.sendOneWay(reply.createMessage(),
session.getHost());
}
public static void addSSTable(PendingFile pendingFile)
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=999737&r1=999736&r2=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Wed
Sep 22 03:40:59 2010
@@ -37,7 +37,11 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
-/** for streaming data from other nodes in to this one */
+/**
+ * for streaming data from other nodes in to this one.
+ * Sends a STREAM_REQUEST Message to the source node(s), after which StreamOut
on that side takes over.
+ * See StreamOut for details.
+ */
public class StreamIn
{
private static Logger logger = LoggerFactory.getLogger(StreamIn.class);
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=999737&r1=999736&r2=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Wed
Sep 22 03:40:59 2010
@@ -40,12 +40,19 @@ import org.apache.cassandra.utils.Pair;
/**
* This class handles streaming data from one node to another.
*
- * For StreamingRepair and Unbootstrap
- * 1. The ranges are transferred on a single file basis.
- * 2. Each transfer has the header information for the sstable being
transferred.
- * 3. List of the pending files are maintained, as this is the source node.
+ * The source node is in charge of the streaming session. It begins the
stream by sending
+ * a Message with the stream bit flag in the Header turned on. Part of that
Message
+ * will include a StreamHeader that includes the files that will be streamed
as part
+ * of that session, as well as the first file-to-be-streamed. (Combining
session list
+ * and first file like this is inconvenient, but not as inconvenient as the old
+ * three-part send-file-list, wait-for-ack, start-first-file dance.)
*
- * For Stream requests (for bootstrap), the main difference is that we always
have to
+ * After each file, the target will send a StreamReply indicating success
+ * (FILE_FINISHED) or failure (FILE_RETRY).
+ *
+ * When all files have been successfully transferred the session is complete.
+ *
+ * For Stream requests (for bootstrap), one subtlety is that we always have to
* create at least one stream reply, even if the list of files is empty,
otherwise the
* target has no way to know that it can stop waiting for an answer.
*
Copied:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java (from
r999733,
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java)
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java&r1=999733&r2=999737&rev=999737&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
Wed Sep 22 03:40:59 2010
@@ -31,62 +31,52 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-class FileStatus
+class StreamReply
{
- private static ICompactSerializer<FileStatus> serializer;
-
- static enum Action
+ static enum Status
{
// was received successfully, and can be deleted from the source node
- FINISHED,
+ FILE_FINISHED,
// needs to be streamed (or restreamed)
- RETRY,
- }
-
- static
- {
- serializer = new FileStatusSerializer();
+ FILE_RETRY,
}
- public static ICompactSerializer<FileStatus> serializer()
- {
- return serializer;
- }
+ public static final ICompactSerializer<StreamReply> serializer = new
FileStatusSerializer();
public final long sessionId;
public final String file;
- public final Action action;
+ public final Status action;
- public FileStatus(String file, long sessionId, Action action)
+ public StreamReply(String file, long sessionId, Status action)
{
this.file = file;
this.action = action;
this.sessionId = sessionId;
}
- public Message makeStreamStatusMessage() throws IOException
+ public Message createMessage() throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- FileStatus.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.STREAM_STATUS, bos.toByteArray());
+ serializer.serialize(this, dos);
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.STREAM_REPLY, bos.toByteArray());
}
- private static class FileStatusSerializer implements
ICompactSerializer<FileStatus>
+ private static class FileStatusSerializer implements
ICompactSerializer<StreamReply>
{
- public void serialize(FileStatus streamStatus, DataOutputStream dos)
throws IOException
+ public void serialize(StreamReply reply, DataOutputStream dos) throws
IOException
{
- dos.writeLong(streamStatus.sessionId);
- dos.writeUTF(streamStatus.file);
- dos.writeInt(streamStatus.action.ordinal());
+ dos.writeLong(reply.sessionId);
+ dos.writeUTF(reply.file);
+ dos.writeInt(reply.action.ordinal());
}
- public FileStatus deserialize(DataInputStream dis) throws IOException
+ public StreamReply deserialize(DataInputStream dis) throws IOException
{
long sessionId = dis.readLong();
String targetFile = dis.readUTF();
- Action action = Action.values()[dis.readInt()];
- return new FileStatus(targetFile, sessionId, action);
+ Status action = Status.values()[dis.readInt()];
+ return new StreamReply(targetFile, sessionId, action);
}
}
}
Copied:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
(from r999733,
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java)
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java&r1=999733&r2=999737&rev=999737&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamStatusVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
Wed Sep 22 03:40:59 2010
@@ -28,14 +28,13 @@ import java.io.IOException;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StreamStatusVerbHandler implements IVerbHandler
+public class StreamReplyVerbHandler implements IVerbHandler
{
- private static Logger logger =
LoggerFactory.getLogger(StreamStatusVerbHandler.class);
+ private static Logger logger =
LoggerFactory.getLogger(StreamReplyVerbHandler.class);
public void doVerb(Message message)
{
@@ -44,21 +43,21 @@ public class StreamStatusVerbHandler imp
try
{
- FileStatus streamStatus = FileStatus.serializer().deserialize(new
DataInputStream(bufIn));
- StreamOutSession session = StreamOutSession.get(message.getFrom(),
streamStatus.sessionId);
- session.validateCurrentFile(streamStatus.file);
+ StreamReply reply = StreamReply.serializer.deserialize(new
DataInputStream(bufIn));
+ StreamOutSession session = StreamOutSession.get(message.getFrom(),
reply.sessionId);
+ session.validateCurrentFile(reply.file);
- switch (streamStatus.action)
+ switch (reply.action)
{
- case FINISHED:
+ case FILE_FINISHED:
session.startNext();
break;
- case RETRY:
- logger.warn("Need to re-stream file {} to {}",
streamStatus.file, message.getFrom());
+ case FILE_RETRY:
+ logger.warn("Need to re-stream file {} to {}", reply.file,
message.getFrom());
session.retry();
break;
default:
- throw new RuntimeException("Cannot handle
FileStatus.Action: " + streamStatus.action);
+ throw new RuntimeException("Cannot handle
FileStatus.Action: " + reply.action);
}
}
catch (IOException ex)