6503 followup, make sure we send CompleteMessage when streaming is done. Patch by yukim, reviewed by marcuse for CASSANDRA-6503
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1141cdb0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1141cdb0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1141cdb0 Branch: refs/heads/trunk Commit: 1141cdb0c8122d3e990f29dc82576bededcbfd40 Parents: a8a12d9 Author: Marcus Eriksson <[email protected]> Authored: Fri Jan 31 11:55:02 2014 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Fri Jan 31 11:55:02 2014 +0100 ---------------------------------------------------------------------- .../org/apache/cassandra/streaming/StreamSession.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1141cdb0/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 4777995..7972183 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -144,6 +144,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe } private volatile State state = State.INITIALIZED; + private volatile boolean completeSent = false; /** * Create new streaming session with the peer. @@ -505,11 +506,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe { if (state == State.WAIT_COMPLETE) { + if (!completeSent) + { + handler.sendMessage(new CompleteMessage()); + completeSent = true; + } closeSession(State.COMPLETE); } else { - handler.sendMessage(new CompleteMessage()); state(State.WAIT_COMPLETE); } } @@ -594,12 +599,18 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe { if (state == State.WAIT_COMPLETE) { + if (!completeSent) + { + handler.sendMessage(new CompleteMessage()); + completeSent = true; + } closeSession(State.COMPLETE); } else { // notify peer that this session is completed handler.sendMessage(new CompleteMessage()); + completeSent = true; state(State.WAIT_COMPLETE); } }
