Author: gdusbabek
Date: Wed May 26 18:56:37 2010
New Revision: 948536
URL: http://svn.apache.org/viewvc?rev=948536&view=rev
Log:
rename StreamCompletionHandler to FileStatusHandler. patch by stuhood, reviwed
by gdusbabek. CASSANDRA-1019
Added:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
- copied, changed from r948535,
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
Copied:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
(from r948535,
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java)
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?p2=cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java&p1=cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java&r1=948535&r2=948536&rev=948536&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
Wed May 26 18:56:37 2010
@@ -30,35 +30,42 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.StreamInManager;
import org.apache.cassandra.service.StorageService;
/**
- * This is the callback handler that is invoked when we have
- * completely received a single file from a remote host.
- *
- * TODO if we move this into CFS we could make addSSTables private, improving
encapsulation.
+ * This is the callback handler that is invoked on the receiving node when a
file changes status from RECEIVE to either
+ * FileStatus.STREAM (needs to be restreamed) or FileStatus.DELETE
(successfully completed).
*/
-class StreamCompletionHandler implements IStreamComplete
+class FileStatusHandler
{
- private static Logger logger =
LoggerFactory.getLogger(StreamCompletionHandler.class);
+ private static Logger logger =
LoggerFactory.getLogger(FileStatusHandler.class);
- public void onStreamCompletion(InetAddress host, PendingFile pendingFile,
FileStatus streamStatus) throws IOException
+ public void onStatusChange(InetAddress host, PendingFile pendingFile,
FileStatus streamStatus) throws IOException
{
- /* Parse the stream context and the file to the list of SSTables in
the associated Column Family Store. */
- if (pendingFile.getFilename().contains("-Data.db"))
+ if (FileStatus.StreamCompletionAction.STREAM ==
streamStatus.getAction())
{
+ // file needs to be restreamed
+ logger.warn("Streaming of file " + pendingFile + " from " + host +
" failed, but will be retried.");
+ // request that the source node re-stream the file
+
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
host);
+ return;
+ }
+ assert FileStatus.StreamCompletionAction.DELETE ==
streamStatus.getAction() :
+ "Unknown stream status: " + streamStatus.getAction();
+
+ // file was successfully streamed: if it was the last component of an
sstable, assume that the rest
+ // have already arrived
+ if (pendingFile.getFilename().endsWith("-Data.db"))
+ {
+ // last component triggers add: see TODO in
SSTable.getAllComponents()
String tableName = pendingFile.getDescriptor().ksname;
- File file = new File( pendingFile.getFilename() );
+ File file = new File(pendingFile.getFilename());
String fileName = file.getName();
String [] temp = fileName.split("-");
- //Open the file to see if all parts are now here
try
{
SSTableReader sstable =
SSTableWriter.renameAndOpen(pendingFile.getFilename());
- //TODO add a sanity check that this sstable has all its parts
and is ok
Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
logger.info("Streaming added " + sstable.getFilename());
}
@@ -68,12 +75,12 @@ class StreamCompletionHandler implements
}
}
+ // send a StreamStatus message telling the source node it can delete
this file
if (logger.isDebugEnabled())
- logger.debug("Sending a streaming finished message with " +
streamStatus + " to " + host);
- /* Send a StreamStatus message which may require the source node to
re-stream certain files. */
+ logger.debug("Sending a streaming finished message for " +
pendingFile + " to " + host);
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(),
host);
- /* If we're done with everything for this host, remove from bootstrap
sources */
+ // if all files have been received from this host, remove from
bootstrap sources
if (StreamInManager.isDone(host) &&
StorageService.instance.isBootstrapMode())
{
StorageService.instance.removeBootstrapSource(host,
pendingFile.getDescriptor().ksname);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=948536&r1=948535&r2=948536&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Wed May 26 18:56:37 2010
@@ -69,7 +69,7 @@ public class IncomingStreamReader
{
/* Ask the source node to re-stream this file. */
streamStatus.setAction(FileStatus.StreamCompletionAction.STREAM);
- handleStreamCompletion(remoteAddress.getAddress());
+ handleFileStatus(remoteAddress.getAddress());
/* Delete the orphaned file. */
File file = new File(pendingFile.getFilename());
file.delete();
@@ -88,18 +88,18 @@ public class IncomingStreamReader
logger.debug("Removing stream context " + pendingFile);
}
fc.close();
- handleStreamCompletion(remoteAddress.getAddress());
+ handleFileStatus(remoteAddress.getAddress());
}
}
- private void handleStreamCompletion(InetAddress remoteHost) throws
IOException
+ private void handleFileStatus(InetAddress remoteHost) throws IOException
{
/*
* Streaming is complete. If all the data that has to be received
inform the sender via
* the stream completion callback so that the source may perform the
requisite cleanup.
*/
- IStreamComplete streamComplete =
StreamInManager.getStreamCompletionHandler(remoteHost);
- if (streamComplete != null)
- streamComplete.onStreamCompletion(remoteHost, pendingFile,
streamStatus);
+ FileStatusHandler handler =
StreamInManager.getFileStatusHandler(remoteHost);
+ if (handler != null)
+ handler.onStatusChange(remoteHost, pendingFile, streamStatus);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=948536&r1=948535&r2=948536&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
Wed May 26 18:56:37 2010
@@ -24,7 +24,7 @@ import java.net.InetAddress;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
-import org.apache.cassandra.streaming.IStreamComplete;
+import org.apache.cassandra.streaming.FileStatusHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +38,7 @@ class StreamInManager
/* Maintain in this map the status of the streams that need to be sent
back to the source */
public static final Map<InetAddress, List<FileStatus>> streamStatusBag_ =
new Hashtable<InetAddress, List<FileStatus>>();
/* Maintains a callback handler per endpoint to notify the app that a
stream from a given endpoint has been handled */
- public static final Map<InetAddress, IStreamComplete>
streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
+ public static final Map<InetAddress, FileStatusHandler>
streamNotificationHandlers_ = new HashMap<InetAddress, FileStatusHandler>();
public static final Multimap<InetAddress, PendingFile> activeStreams =
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, PendingFile>create());
@@ -85,7 +85,7 @@ class StreamInManager
}
/*
- * This method helps determine if the StreamCompletionHandler needs
+ * This method helps determine if the FileStatusHandler needs
* to be invoked for the data being streamed from a source.
*/
public synchronized static boolean isDone(InetAddress key)
@@ -93,17 +93,17 @@ class StreamInManager
return (ctxBag_.get(key) == null);
}
- public synchronized static IStreamComplete
getStreamCompletionHandler(InetAddress key)
+ public synchronized static FileStatusHandler
getFileStatusHandler(InetAddress key)
{
return streamNotificationHandlers_.get(key);
}
- public synchronized static void removeStreamCompletionHandler(InetAddress
key)
+ public synchronized static void removeFileStatusHandler(InetAddress key)
{
streamNotificationHandlers_.remove(key);
}
- public synchronized static void
registerStreamCompletionHandler(InetAddress key, IStreamComplete streamComplete)
+ public synchronized static void registerFileStatusHandler(InetAddress key,
FileStatusHandler streamComplete)
{
streamNotificationHandlers_.put(key, streamComplete);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=948536&r1=948535&r2=948536&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
Wed May 26 18:56:37 2010
@@ -86,7 +86,7 @@ public class StreamInitiateVerbHandler i
addStreamContext(message.getFrom(), localFile, streamStatus);
}
- StreamInManager.registerStreamCompletionHandler(message.getFrom(),
new StreamCompletionHandler());
+ StreamInManager.registerFileStatusHandler(message.getFrom(), new
FileStatusHandler());
if (logger.isDebugEnabled())
logger.debug("Sending a stream initiate done message ...");
Message doneMessage = new Message(FBUtilities.getLocalAddress(),
"", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] );