Ensure unique streaming session id's Patch by Aaron Morton, reviewed by Yuki Morishita for CASSANDRA-4223
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2db61ab9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2db61ab9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2db61ab9 Branch: refs/heads/cassandra-1.1 Commit: 2db61ab9e0a5fe7f764236ec060ee370c19bd593 Parents: 0077edc Author: Aaron Morton <[email protected]> Authored: Fri May 18 15:17:39 2012 +1200 Committer: Aaron Morton <[email protected]> Committed: Fri May 18 15:17:39 2012 +1200 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../apache/cassandra/streaming/StreamHeader.java | 5 ++ .../cassandra/streaming/StreamInSession.java | 31 ++++++++++++++- .../cassandra/streaming/StreamOutSession.java | 29 +++++++++++++- 4 files changed, 65 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2db61ab9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1c1c184..d1d00f9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +1.0.11 + * ensure unique streaming session id's (CASSANDRA-4223) + 1.0.10 * fix maxTimestamp to include row tombstones (CASSANDRA-4116) * avoid streaming empty files with bulk loader if sstablewriter errors out http://git-wip-us.apache.org/repos/asf/cassandra/blob/2db61ab9/src/java/org/apache/cassandra/streaming/StreamHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java b/src/java/org/apache/cassandra/streaming/StreamHeader.java index 95fcc0d..eb53a06 100644 --- a/src/java/org/apache/cassandra/streaming/StreamHeader.java +++ b/src/java/org/apache/cassandra/streaming/StreamHeader.java @@ -33,6 +33,11 @@ public class StreamHeader { private static IVersionedSerializer<StreamHeader> serializer; + // Streaming sessionId flags, used to avoid duplicate session id's between nodes. + // See StreamInSession and StreamOutSession + public static final int STREAM_IN_SOURCE_FLAG = 0; + public static final int STREAM_OUT_SOURCE_FLAG = 1; + static { serializer = new StreamHeaderSerializer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2db61ab9/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java index aa92d4d..feb82dd 100644 --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Table; @@ -50,15 +51,38 @@ public class StreamInSession private final List<SSTableReader> readers = new ArrayList<SSTableReader>(); private PendingFile current; + private final static AtomicInteger sessionIdCounter = new AtomicInteger(0); + private StreamInSession(Pair<InetAddress, Long> context, Runnable callback) { this.context = context; this.callback = callback; } + /** + * The next session id is a combination of a local integer counter and a flag used to avoid collisions + * between session id's generated on different machines. Nodes can may have StreamOutSessions with the + * following contexts: + * + * <1.1.1.1, (stream_in_flag, 6)> + * <1.1.1.1, (stream_out_flag, 6)> + * + * The first is an out stream created in response to a request from node 1.1.1.1. The id (6) was created by + * the requesting node. The second is an out stream created by this node to push to 1.1.1.1. The id (6) was + * created by this node. + * + * Note: The StreamInSession results in a StreamOutSession on the target that uses the StreamInSession sessionId. + * + * @return next StreamInSession sessionId + */ + private static long nextSessionId() + { + return (((long)StreamHeader.STREAM_IN_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet()); + } + public static StreamInSession create(InetAddress host, Runnable callback) { - Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, System.nanoTime()); + Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, nextSessionId()); StreamInSession session = new StreamInSession(context, callback); sessions.put(context, session); return session; @@ -168,6 +192,11 @@ public class StreamInSession } } + public int getSourceFlag() + { + return (int)(context.right >> 32); + } + public long getSessionId() { return context.right; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2db61ab9/src/java/org/apache/cassandra/streaming/StreamOutSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java index 3cbb294..4d09821 100644 --- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java @@ -23,6 +23,7 @@ import java.net.InetAddress; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -43,10 +44,29 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur // one host may have multiple stream sessions. private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession> streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>(); + private final static AtomicInteger sessionIdCounter = new AtomicInteger(0); + + /** + * The next session id is a combination of a local integer counter and a flag used to avoid collisions + * between session id's generated on different machines. Nodes can may have StreamOutSessions with the + * following contexts: + * + * <1.1.1.1, (stream_in_flag, 6)> + * <1.1.1.1, (stream_out_flag, 6)> + * + * The first is an out stream created in response to a request from node 1.1.1.1. The id (6) was created by + * the requesting node. The second is an out stream created by this node to push to 1.1.1.1. The id (6) was + * created by this node. + * @return next StreamOutSession sessionId + */ + private static long nextSessionId() + { + return (((long)StreamHeader.STREAM_OUT_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet()); + } public static StreamOutSession create(String table, InetAddress host, Runnable callback) { - return create(table, host, System.nanoTime(), callback); + return create(table, host, nextSessionId(), callback); } public static StreamOutSession create(String table, InetAddress host, long sessionId) @@ -84,6 +104,11 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur FailureDetector.instance.registerFailureDetectionEventListener(this); } + public int getSourceFlag() + { + return (int)(context.right >> 32); + } + public InetAddress getHost() { return context.left; @@ -138,7 +163,7 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur // time, if the endpoint die at the exact wrong time for instance. if (!isClosed.compareAndSet(false, true)) { - logger.debug("StreamOutSession {} already closed", getSessionId()); + logger.debug("StreamOutSession {} to {} already closed", getSessionId(), getHost()); return; }
