Fix duplicate SSTable reference when stream session failed; patch by yukim 
reviewed by Sylvain Lebresne for CASSANDRA-3306


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d401d9f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d401d9f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d401d9f1

Branch: refs/heads/cassandra-1.1
Commit: d401d9f1deaad08ff0a76e3795ec4b1c0fa72421
Parents: 99b245d
Author: Yuki Morishita <[email protected]>
Authored: Wed Oct 31 10:42:36 2012 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Wed Oct 31 10:42:36 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/streaming/IncomingStreamReader.java  |   12 ++++++++++++
 .../cassandra/streaming/StreamInSession.java       |    6 ++++++
 3 files changed, 19 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d401d9f1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e69298f..5f5ea89 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * fix compositeType.{get/from}String methods (CASSANDRA-4842)
  * (CQL) fix CREATE COLUMNFAMILY permissions check (CASSANDRA-4864)
  * Fix DynamicCompositeType same type comparison (CASSANDRA-4711)
+ * Fix duplicate SSTable reference when stream session failed (CASSANDRA-3306)
 
 
 1.1.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d401d9f1/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java 
b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index 915d3bc..bfb046f 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -31,9 +31,11 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.OutboundTcpConnection;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
@@ -60,6 +62,16 @@ public class IncomingStreamReader
         this.socket = socket;
         InetAddress host = header.broadcastAddress != null ? 
header.broadcastAddress
                            : 
((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
+        if (header.pendingFiles.isEmpty() && header.file != null)
+        {
+            // StreamInSession should be created already when receiving 2nd 
and after files
+            if (!StreamInSession.hasSession(host, header.sessionId))
+            {
+                StreamReply reply = new StreamReply("", header.sessionId, 
StreamReply.Status.SESSION_FAILURE);
+                
OutboundTcpConnection.write(reply.getMessage(Gossiper.instance.getVersion(host)),
 Long.toString(header.sessionId), new 
DataOutputStream(socket.getOutputStream()));
+                throw new IOException("Session " + header.sessionId + " 
already closed.");
+            }
+        }
         session = StreamInSession.get(host, header.sessionId);
         session.setSocket(socket);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d401d9f1/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java 
b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index e123714..e1bbe41 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -102,6 +102,12 @@ public class StreamInSession extends AbstractStreamSession
         return session;
     }
 
+    public static boolean hasSession(InetAddress host, long sessionId)
+    {
+        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, 
sessionId);
+        return sessions.get(context) != null;
+    }
+
     public void setCurrentFile(PendingFile file)
     {
         this.current = file;

Reply via email to