Author: jbellis
Date: Wed Sep 22 03:41:09 2010
New Revision: 999738

URL: http://svn.apache.org/viewvc?rev=999738&view=rev
Log:
add SESSION_FINISHED reply
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415


Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
    
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
Wed Sep 22 03:41:09 2010
@@ -539,7 +539,6 @@ public class AntiEntropyService
                     {
                         StreamOutSession session = 
StreamOutSession.create(request.cf.left, request.endpoint, null);
                         StreamOut.transferSSTables(session, sstables, ranges);
-                        session.close();
                     }
                 });
                 // request ranges from the remote node

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Wed Sep 22 03:41:09 2010
@@ -89,9 +89,7 @@ public class IncomingStreamReader
         catch (IOException ex)
         {
             /* Ask the source node to re-stream this file. */
-            StreamReply reply = new StreamReply(remoteFile.getFilename(), 
session.getSessionId(), StreamReply.Status.FILE_RETRY);
-            logger.info("Streaming of file {} from {} failed: requesting a 
retry.", remoteFile, session);
-            MessagingService.instance.sendOneWay(reply.createMessage(), 
session.getHost());
+            session.retry(remoteFile);
 
             /* Delete the orphaned file. */
             FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
@@ -102,16 +100,8 @@ public class IncomingStreamReader
             fc.close();
         }
 
-        if (logger.isDebugEnabled())
-            logger.debug("Removing stream context {}", remoteFile);
-
-        StreamReply reply = new StreamReply(remoteFile.getFilename(), 
session.getSessionId(), StreamReply.Status.FILE_FINISHED);
         addSSTable(localFile);
-        session.remove(remoteFile);
-        // send a StreamStatus message telling the source node it can delete 
this file
-        if (logger.isDebugEnabled())
-            logger.debug("Sending a streaming finished message for {} to {}", 
remoteFile, session);
-        MessagingService.instance.sendOneWay(reply.createMessage(), 
session.getHost());
+        session.finished(remoteFile);
     }
 
     public static void addSSTable(PendingFile pendingFile)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
Wed Sep 22 03:41:09 2010
@@ -18,12 +18,12 @@
 
 package org.apache.cassandra.streaming;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.net.MessagingService;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.apache.cassandra.utils.Pair;
 
@@ -87,15 +87,31 @@ public class StreamInSession
         }
     }
 
-    public void remove(PendingFile file)
+    public void finished(PendingFile remoteFile) throws IOException
     {
-        files.remove(file);
+        if (logger.isDebugEnabled())
+            logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
+        files.remove(remoteFile);
+        StreamReply reply = new StreamReply(remoteFile.getFilename(), 
getSessionId(), StreamReply.Status.FILE_FINISHED);
+        // send a StreamStatus message telling the source node it can delete 
this file
+        MessagingService.instance.sendOneWay(reply.createMessage(), getHost());
     }
 
-    public void closeIfFinished()
+    public void retry(PendingFile remoteFile) throws IOException
+    {
+        StreamReply reply = new StreamReply(remoteFile.getFilename(), 
getSessionId(), StreamReply.Status.FILE_RETRY);
+        logger.info("Streaming of file {} from {} failed: requesting a 
retry.", remoteFile, this);
+        MessagingService.instance.sendOneWay(reply.createMessage(), getHost());
+    }
+
+    public void closeIfFinished() throws IOException
     {
         if (files.isEmpty())
         {
+            StreamReply reply = new StreamReply("", getSessionId(), 
StreamReply.Status.SESSION_FINISHED);
+            logger.info("Finished streaming session {} from {}", 
getSessionId(), getHost());
+            MessagingService.instance.sendOneWay(reply.createMessage(), 
getHost());
+
             if (callback != null)
                 callback.run();
             sessions.remove(context);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Wed 
Sep 22 03:41:09 2010
@@ -50,7 +50,8 @@ import org.apache.cassandra.utils.Pair;
  * After each file, the target will send a StreamReply indicating success
  * (FILE_FINISHED) or failure (FILE_RETRY).
  *
- * When all files have been successfully transferred the session is complete.
+ * When all files have been successfully transferred and integrated the source 
will send
+ * SESSION_FINISHED and the session is complete.
  *
  * For Stream requests (for bootstrap), one subtlety is that we always have to
  * create at least one stream reply, even if the list of files is empty, 
otherwise the
@@ -83,10 +84,6 @@ public class StreamOut
         {
             throw new IOError(e);
         }
-        finally
-        {
-            session.close();
-        }
     }
 
     /**
@@ -150,10 +147,6 @@ public class StreamOut
         {
             session.addFilesToStream(pending);
             session.begin();
-
-            logger.info("Waiting for transfer to {} to complete", 
session.getHost());
-            session.waitForStreamCompletion();
-            logger.info("Done with transfer to {}", session.getHost());
         }
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java 
Wed Sep 22 03:41:09 2010
@@ -69,7 +69,6 @@ public class StreamOutSession
 
     public final String table;
     private final Pair<InetAddress, Long> context;
-    private final SimpleCondition condition = new SimpleCondition();
     private final Runnable callback;
     private String currentFile;
 
@@ -92,8 +91,6 @@ public class StreamOutSession
     
     public void addFilesToStream(List<PendingFile> pendingFiles)
     {
-        // reset the condition in case this SOM is getting reused before it 
can be removed.
-        condition.reset();
         for (PendingFile pendingFile : pendingFiles)
         {
             if (logger.isDebugEnabled())
@@ -117,19 +114,11 @@ public class StreamOutSession
 
     public void startNext() throws IOException
     {
+        assert files.containsKey(currentFile);
         files.remove(currentFile);
-        
-        if (files.isEmpty())
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("Signalling that streaming is done for {} session 
{}", getHost(), getSessionId());
-            close();
-            condition.signalAll();
-        }
-        else
-        {
-            streamFile(files.values().iterator().next());
-        }
+        Iterator<PendingFile> iter = files.values().iterator();
+        if (iter.hasNext())
+            streamFile(iter.next());
     }
 
     public void close()
@@ -139,23 +128,11 @@ public class StreamOutSession
             callback.run();
     }
 
-    public void removePending(PendingFile pf)
-    {
-        files.remove(pf.getFilename());
-        if (files.isEmpty())
-            close();
-    }
-
-    public void waitForStreamCompletion()
+    /** convenience method for use when testing */
+    void await() throws InterruptedException
     {
-        try
-        {
-            condition.await();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        while (streams.containsKey(context))
+            Thread.sleep(10);
     }
 
     Collection<PendingFile> getFiles()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java 
Wed Sep 22 03:41:09 2010
@@ -35,10 +35,9 @@ class StreamReply
 {
     static enum Status
     {
-        // was received successfully, and can be deleted from the source node
         FILE_FINISHED,
-        // needs to be streamed (or restreamed)
         FILE_RETRY,
+        SESSION_FINISHED,
     }
 
     public static final ICompactSerializer<StreamReply> serializer = new 
FileStatusSerializer();
@@ -62,6 +61,16 @@ class StreamReply
         return new Message(FBUtilities.getLocalAddress(), 
StorageService.Verb.STREAM_REPLY, bos.toByteArray());
     }
 
+    @Override
+    public String toString()
+    {
+        return "StreamReply(" +
+               "sessionId=" + sessionId +
+               ", file='" + file + '\'' +
+               ", action=" + action +
+               ')';
+    }
+
     private static class FileStatusSerializer implements 
ICompactSerializer<StreamReply>
     {
         public void serialize(StreamReply reply, DataOutputStream dos) throws 
IOException

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
 Wed Sep 22 03:41:09 2010
@@ -44,18 +44,23 @@ public class StreamReplyVerbHandler impl
         try
         {
             StreamReply reply = StreamReply.serializer.deserialize(new 
DataInputStream(bufIn));
+            logger.debug("Received StreamReply {}", reply);
             StreamOutSession session = StreamOutSession.get(message.getFrom(), 
reply.sessionId);
-            session.validateCurrentFile(reply.file);
 
             switch (reply.action)
             {
                 case FILE_FINISHED:
+                    session.validateCurrentFile(reply.file);
                     session.startNext();
                     break;
                 case FILE_RETRY:
-                    logger.warn("Need to re-stream file {} to {}", reply.file, 
message.getFrom());
+                    session.validateCurrentFile(reply.file);
+                    logger.info("Need to re-stream file {} to {}", reply.file, 
message.getFrom());
                     session.retry();
                     break;
+                case SESSION_FINISHED:
+                    session.close();
+                    break;
                 default:
                     throw new RuntimeException("Cannot handle 
FileStatus.Action: " + reply.action);
             }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=999738&r1=999737&r2=999738&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
 Wed Sep 22 03:41:09 2010
@@ -66,7 +66,9 @@ public class StreamingTransferTest exten
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), 
p.getToken("key".getBytes())));
         ranges.add(new Range(p.getToken("key2".getBytes()), 
p.getMinimumToken()));
-        StreamOut.transferSSTables(StreamOutSession.create(tablename, LOCAL, 
null), Arrays.asList(sstable), ranges);
+        StreamOutSession session = StreamOutSession.create(tablename, LOCAL, 
null);
+        StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges);
+        session.await();
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = 
Table.open(tablename).getColumnFamilyStore(cfname);
@@ -106,7 +108,9 @@ public class StreamingTransferTest exten
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), 
p.getToken("transfer1".getBytes())));
         ranges.add(new Range(p.getToken("test2".getBytes()), 
p.getMinimumToken()));
-        StreamOut.transferSSTables(StreamOutSession.create(tablename, LOCAL, 
null), Arrays.asList(sstable, sstable2), ranges);
+        StreamOutSession session = StreamOutSession.create(tablename, LOCAL, 
null);
+        StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), 
ranges);
+        session.await();
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = 
Table.open(tablename).getColumnFamilyStore(cfname);


Reply via email to