Make StreamSession more thread safe patch by sankalp kohli; reviewed by yukim for CASSANDRA-7092
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7484bd41 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7484bd41 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7484bd41 Branch: refs/heads/trunk Commit: 7484bd41918cc042642753f1ad1eaf468c6fc3af Parents: d48c797 Author: Yuki Morishita <yu...@apache.org> Authored: Fri May 9 10:40:50 2014 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Fri May 9 10:40:50 2014 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/streaming/StreamSession.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7484bd41/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 0ba41fb..30e3fa2 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -20,11 +20,9 @@ package org.apache.cassandra.streaming; import java.io.IOException; import java.net.InetAddress; import java.util.*; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import com.google.common.collect.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,11 +121,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe private StreamResultFuture streamResult; // stream requests to send to the peer - private final List<StreamRequest> requests = new ArrayList<>(); + private final Set<StreamRequest> requests = Sets.newConcurrentHashSet(); // streaming tasks are created and managed per ColumnFamily ID - private final Map<UUID, StreamTransferTask> transfers = new HashMap<>(); + private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>(); // data receivers, filled after receiving prepare message - private final Map<UUID, StreamReceiveTask> receivers = new HashMap<>(); + private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>(); private final StreamingMetrics metrics; public final ConnectionHandler handler;