Fix duplicate SSTable reference when stream session failed; patch by yukim reviewed by Sylvain Lebresne for CASSANDRA-3306
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d401d9f1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d401d9f1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d401d9f1 Branch: refs/heads/trunk Commit: d401d9f1deaad08ff0a76e3795ec4b1c0fa72421 Parents: 99b245d Author: Yuki Morishita <[email protected]> Authored: Wed Oct 31 10:42:36 2012 -0500 Committer: Yuki Morishita <[email protected]> Committed: Wed Oct 31 10:42:36 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/IncomingStreamReader.java | 12 ++++++++++++ .../cassandra/streaming/StreamInSession.java | 6 ++++++ 3 files changed, 19 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d401d9f1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e69298f..5f5ea89 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ * fix compositeType.{get/from}String methods (CASSANDRA-4842) * (CQL) fix CREATE COLUMNFAMILY permissions check (CASSANDRA-4864) * Fix DynamicCompositeType same type comparison (CASSANDRA-4711) + * Fix duplicate SSTable reference when stream session failed (CASSANDRA-3306) 1.1.6 http://git-wip-us.apache.org/repos/asf/cassandra/blob/d401d9f1/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java index 915d3bc..bfb046f 100644 --- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java @@ -31,9 +31,11 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.compaction.CompactionController; import org.apache.cassandra.db.compaction.PrecompactedRow; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.IColumnSerializer; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.net.OutboundTcpConnection; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; @@ -60,6 +62,16 @@ public class IncomingStreamReader this.socket = socket; InetAddress host = header.broadcastAddress != null ? header.broadcastAddress : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress(); + if (header.pendingFiles.isEmpty() && header.file != null) + { + // StreamInSession should be created already when receiving 2nd and after files + if (!StreamInSession.hasSession(host, header.sessionId)) + { + StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE); + OutboundTcpConnection.write(reply.getMessage(Gossiper.instance.getVersion(host)), Long.toString(header.sessionId), new DataOutputStream(socket.getOutputStream())); + throw new IOException("Session " + header.sessionId + " already closed."); + } + } session = StreamInSession.get(host, header.sessionId); session.setSocket(socket); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d401d9f1/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java index e123714..e1bbe41 100644 --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@ -102,6 +102,12 @@ public class StreamInSession extends AbstractStreamSession return session; } + public static boolean hasSession(InetAddress host, long sessionId) + { + Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, sessionId); + return sessions.get(context) != null; + } + public void setCurrentFile(PendingFile file) { this.current = file;
