Repository: cassandra Updated Branches: refs/heads/trunk 953a3729f -> d66022392
Make StreamSession#closeSession() idempotent Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-7262 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/709b9fc3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/709b9fc3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/709b9fc3 Branch: refs/heads/trunk Commit: 709b9fc319f669ee07338a842f36a9d77e8c46a1 Parents: e68ac31 Author: Marcus Eriksson <[email protected]> Authored: Mon Jun 2 08:53:32 2014 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Mon Jun 2 08:58:11 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamSession.java | 32 ++++++++++++-------- 2 files changed, 20 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/709b9fc3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d1d1030..bc95a8d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ * Don't try to compact already-compacting files in HHOM (CASSANDRA-7288) * Add authentication support to shuffle (CASSANDRA-6484) * Cqlsh counts non-empty lines for "Blank lines" warning (CASSANDRA-7325) + * Make StreamSession#closeSession() idempotent (CASSANDRA-7262) Merged from 1.2: * Fix availability validation for LOCAL_ONE CL (CASSANDRA-7319) * Use LOCAL_ONE for non-superuser auth queries (CASSANDRA-7328) http://git-wip-us.apache.org/repos/asf/cassandra/blob/709b9fc3/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 30e3fa2..79ad487 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.*; import org.slf4j.Logger; @@ -132,6 +133,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe private int retries; + private AtomicBoolean isAborted = new AtomicBoolean(false); + public static enum State { INITIALIZED, @@ -329,23 +332,26 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe } } - private void closeSession(State finalState) + private synchronized void closeSession(State finalState) { - state(finalState); - - if (finalState == State.FAILED) + if (isAborted.compareAndSet(false, true)) { - for (StreamTask task : Iterables.concat(receivers.values(), transfers.values())) - task.abort(); - } + state(finalState); - // Note that we shouldn't block on this close because this method is called on the handler - // incoming thread (so we would deadlock). - handler.close(); + if (finalState == State.FAILED) + { + for (StreamTask task : Iterables.concat(receivers.values(), transfers.values())) + task.abort(); + } + + // Note that we shouldn't block on this close because this method is called on the handler + // incoming thread (so we would deadlock). + handler.close(); - Gossiper.instance.unregister(this); - FailureDetector.instance.unregisterFailureDetectionEventListener(this); - streamResult.handleSessionComplete(this); + Gossiper.instance.unregister(this); + FailureDetector.instance.unregisterFailureDetectionEventListener(this); + streamResult.handleSessionComplete(this); + } } /**
