Author: jbellis
Date: Wed Sep 22 03:41:09 2010
New Revision: 999738
URL: http://svn.apache.org/viewvc?rev=999738&view=rev
Log:
add SESSION_FINISHED reply
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Wed Sep 22 03:41:09 2010
@@ -539,7 +539,6 @@ public class AntiEntropyService
{
StreamOutSession session =
StreamOutSession.create(request.cf.left, request.endpoint, null);
StreamOut.transferSSTables(session, sstables, ranges);
- session.close();
}
});
// request ranges from the remote node
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Wed Sep 22 03:41:09 2010
@@ -89,9 +89,7 @@ public class IncomingStreamReader
catch (IOException ex)
{
/* Ask the source node to re-stream this file. */
- StreamReply reply = new StreamReply(remoteFile.getFilename(),
session.getSessionId(), StreamReply.Status.FILE_RETRY);
- logger.info("Streaming of file {} from {} failed: requesting a
retry.", remoteFile, session);
- MessagingService.instance.sendOneWay(reply.createMessage(),
session.getHost());
+ session.retry(remoteFile);
/* Delete the orphaned file. */
FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
@@ -102,16 +100,8 @@ public class IncomingStreamReader
fc.close();
}
- if (logger.isDebugEnabled())
- logger.debug("Removing stream context {}", remoteFile);
-
- StreamReply reply = new StreamReply(remoteFile.getFilename(),
session.getSessionId(), StreamReply.Status.FILE_FINISHED);
addSSTable(localFile);
- session.remove(remoteFile);
- // send a StreamStatus message telling the source node it can delete
this file
- if (logger.isDebugEnabled())
- logger.debug("Sending a streaming finished message for {} to {}",
remoteFile, session);
- MessagingService.instance.sendOneWay(reply.createMessage(),
session.getHost());
+ session.finished(remoteFile);
}
public static void addSSTable(PendingFile pendingFile)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
Wed Sep 22 03:41:09 2010
@@ -18,12 +18,12 @@
package org.apache.cassandra.streaming;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.net.MessagingService;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.apache.cassandra.utils.Pair;
@@ -87,15 +87,31 @@ public class StreamInSession
}
}
- public void remove(PendingFile file)
+ public void finished(PendingFile remoteFile) throws IOException
{
- files.remove(file);
+ if (logger.isDebugEnabled())
+ logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
+ files.remove(remoteFile);
+ StreamReply reply = new StreamReply(remoteFile.getFilename(),
getSessionId(), StreamReply.Status.FILE_FINISHED);
+ // send a StreamStatus message telling the source node it can delete
this file
+ MessagingService.instance.sendOneWay(reply.createMessage(), getHost());
}
- public void closeIfFinished()
+ public void retry(PendingFile remoteFile) throws IOException
+ {
+ StreamReply reply = new StreamReply(remoteFile.getFilename(),
getSessionId(), StreamReply.Status.FILE_RETRY);
+ logger.info("Streaming of file {} from {} failed: requesting a
retry.", remoteFile, this);
+ MessagingService.instance.sendOneWay(reply.createMessage(), getHost());
+ }
+
+ public void closeIfFinished() throws IOException
{
if (files.isEmpty())
{
+ StreamReply reply = new StreamReply("", getSessionId(),
StreamReply.Status.SESSION_FINISHED);
+ logger.info("Finished streaming session {} from {}",
getSessionId(), getHost());
+ MessagingService.instance.sendOneWay(reply.createMessage(),
getHost());
+
if (callback != null)
callback.run();
sessions.remove(context);
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Wed
Sep 22 03:41:09 2010
@@ -50,7 +50,8 @@ import org.apache.cassandra.utils.Pair;
* After each file, the target will send a StreamReply indicating success
* (FILE_FINISHED) or failure (FILE_RETRY).
*
- * When all files have been successfully transferred the session is complete.
+ * When all files have been successfully transferred and integrated the source
will send
+ * SESSION_FINISHED and the session is complete.
*
* For Stream requests (for bootstrap), one subtlety is that we always have to
* create at least one stream reply, even if the list of files is empty,
otherwise the
@@ -83,10 +84,6 @@ public class StreamOut
{
throw new IOError(e);
}
- finally
- {
- session.close();
- }
}
/**
@@ -150,10 +147,6 @@ public class StreamOut
{
session.addFilesToStream(pending);
session.begin();
-
- logger.info("Waiting for transfer to {} to complete",
session.getHost());
- session.waitForStreamCompletion();
- logger.info("Done with transfer to {}", session.getHost());
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Wed Sep 22 03:41:09 2010
@@ -69,7 +69,6 @@ public class StreamOutSession
public final String table;
private final Pair<InetAddress, Long> context;
- private final SimpleCondition condition = new SimpleCondition();
private final Runnable callback;
private String currentFile;
@@ -92,8 +91,6 @@ public class StreamOutSession
public void addFilesToStream(List<PendingFile> pendingFiles)
{
- // reset the condition in case this SOM is getting reused before it
can be removed.
- condition.reset();
for (PendingFile pendingFile : pendingFiles)
{
if (logger.isDebugEnabled())
@@ -117,19 +114,11 @@ public class StreamOutSession
public void startNext() throws IOException
{
+ assert files.containsKey(currentFile);
files.remove(currentFile);
-
- if (files.isEmpty())
- {
- if (logger.isDebugEnabled())
- logger.debug("Signalling that streaming is done for {} session
{}", getHost(), getSessionId());
- close();
- condition.signalAll();
- }
- else
- {
- streamFile(files.values().iterator().next());
- }
+ Iterator<PendingFile> iter = files.values().iterator();
+ if (iter.hasNext())
+ streamFile(iter.next());
}
public void close()
@@ -139,23 +128,11 @@ public class StreamOutSession
callback.run();
}
- public void removePending(PendingFile pf)
- {
- files.remove(pf.getFilename());
- if (files.isEmpty())
- close();
- }
-
- public void waitForStreamCompletion()
+ /** convenience method for use when testing */
+ void await() throws InterruptedException
{
- try
- {
- condition.await();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ while (streams.containsKey(context))
+ Thread.sleep(10);
}
Collection<PendingFile> getFiles()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
Wed Sep 22 03:41:09 2010
@@ -35,10 +35,9 @@ class StreamReply
{
static enum Status
{
- // was received successfully, and can be deleted from the source node
FILE_FINISHED,
- // needs to be streamed (or restreamed)
FILE_RETRY,
+ SESSION_FINISHED,
}
public static final ICompactSerializer<StreamReply> serializer = new
FileStatusSerializer();
@@ -62,6 +61,16 @@ class StreamReply
return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.STREAM_REPLY, bos.toByteArray());
}
+ @Override
+ public String toString()
+ {
+ return "StreamReply(" +
+ "sessionId=" + sessionId +
+ ", file='" + file + '\'' +
+ ", action=" + action +
+ ')';
+ }
+
private static class FileStatusSerializer implements
ICompactSerializer<StreamReply>
{
public void serialize(StreamReply reply, DataOutputStream dos) throws
IOException
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
Wed Sep 22 03:41:09 2010
@@ -44,18 +44,23 @@ public class StreamReplyVerbHandler impl
try
{
StreamReply reply = StreamReply.serializer.deserialize(new
DataInputStream(bufIn));
+ logger.debug("Received StreamReply {}", reply);
StreamOutSession session = StreamOutSession.get(message.getFrom(),
reply.sessionId);
- session.validateCurrentFile(reply.file);
switch (reply.action)
{
case FILE_FINISHED:
+ session.validateCurrentFile(reply.file);
session.startNext();
break;
case FILE_RETRY:
- logger.warn("Need to re-stream file {} to {}", reply.file,
message.getFrom());
+ session.validateCurrentFile(reply.file);
+ logger.info("Need to re-stream file {} to {}", reply.file,
message.getFrom());
session.retry();
break;
+ case SESSION_FINISHED:
+ session.close();
+ break;
default:
throw new RuntimeException("Cannot handle
FileStatus.Action: " + reply.action);
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Wed Sep 22 03:41:09 2010
@@ -66,7 +66,9 @@ public class StreamingTransferTest exten
List<Range> ranges = new ArrayList<Range>();
ranges.add(new Range(p.getMinimumToken(),
p.getToken("key".getBytes())));
ranges.add(new Range(p.getToken("key2".getBytes()),
p.getMinimumToken()));
- StreamOut.transferSSTables(StreamOutSession.create(tablename, LOCAL,
null), Arrays.asList(sstable), ranges);
+ StreamOutSession session = StreamOutSession.create(tablename, LOCAL,
null);
+ StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges);
+ session.await();
// confirm that the SSTable was transferred and registered
ColumnFamilyStore cfstore =
Table.open(tablename).getColumnFamilyStore(cfname);
@@ -106,7 +108,9 @@ public class StreamingTransferTest exten
List<Range> ranges = new ArrayList<Range>();
ranges.add(new Range(p.getMinimumToken(),
p.getToken("transfer1".getBytes())));
ranges.add(new Range(p.getToken("test2".getBytes()),
p.getMinimumToken()));
- StreamOut.transferSSTables(StreamOutSession.create(tablename, LOCAL,
null), Arrays.asList(sstable, sstable2), ranges);
+ StreamOutSession session = StreamOutSession.create(tablename, LOCAL,
null);
+ StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2),
ranges);
+ session.await();
// confirm that the SSTable was transferred and registered
ColumnFamilyStore cfstore =
Table.open(tablename).getColumnFamilyStore(cfname);