Author: jbellis
Date: Tue Sep 14 22:26:13 2010
New Revision: 997122
URL: http://svn.apache.org/viewvc?rev=997122&view=rev
Log:
move FileStatusHandler methods to IncomingStreamReader
patch by jbellis; reviewed by Nick Bailey for CASSANDRA-1504
Removed:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=997122&r1=997121&r2=997122&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Tue Sep 14 22:26:13 2010
@@ -22,12 +22,16 @@ import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.io.*;
-import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
public class IncomingStreamReader
@@ -109,7 +113,7 @@ public class IncomingStreamReader
handleFileStatus(FileStatus.Action.DELETE);
else
{
- FileStatusHandler.addSSTable(pendingFile);
+ addSSTable(pendingFile);
session.finishAndRequestNext(lastFile);
}
}
@@ -117,6 +121,38 @@ public class IncomingStreamReader
private void handleFileStatus(FileStatus.Action action) throws IOException
{
streamStatus.setAction(action);
- FileStatusHandler.onStatusChange(session, pendingFile, streamStatus);
+
+ if (FileStatus.Action.STREAM == streamStatus.getAction())
+ {
+ // file needs to be restreamed
+ logger.warn("Streaming of file {} from {} failed: requesting a
retry.", pendingFile, session);
+
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
session.getHost());
+ return;
+ }
+ assert FileStatus.Action.DELETE == streamStatus.getAction() : "Unknown
stream action: " + streamStatus.getAction();
+
+ addSSTable(pendingFile);
+
+ // send a StreamStatus message telling the source node it can delete
this file
+ if (logger.isDebugEnabled())
+ logger.debug("Sending a streaming finished message for {} to {}",
pendingFile, session);
+
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
session.getHost());
+ }
+
+ public static void addSSTable(PendingFile pendingFile)
+ {
+ // file was successfully streamed
+ Descriptor desc = pendingFile.desc;
+ try
+ {
+ SSTableReader sstable =
SSTableWriter.recoverAndOpen(pendingFile.desc);
+
Table.open(desc.ksname).getColumnFamilyStore(desc.cfname).addSSTable(sstable);
+ logger.info("Streaming added " + sstable);
+ }
+ catch (IOException e)
+ {
+ logger.error("Failed adding {}", pendingFile, e);
+ throw new RuntimeException("Not able to add streamed file " +
pendingFile.getFilename(), e);
+ }
}
}