Updated Branches: refs/heads/trunk abea32e19 -> b7a016d99
Fix ConcurrentModificationException during streaming patch by slebresne; reviewed by yukim for CASSANDRA-5782 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7220e3a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7220e3a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7220e3a Branch: refs/heads/trunk Commit: c7220e3a7b8debd8fa348ad258bc0ae8a3cd8509 Parents: abea32e Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Mon Jul 22 15:43:08 2013 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Mon Jul 22 15:43:08 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/streaming/StreamSession.java | 5 +++-- src/java/org/apache/cassandra/streaming/StreamTransferTask.java | 4 +++- 3 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7220e3a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bbf2773..4d56c98 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -13,6 +13,7 @@ * CAS on 'primary key only' table (CASSANDRA-5715) * Support streaming SSTables of old versions (CASSANDRA-5772) * Always respect protocol version in native protocol (CASSANDRA-5778) + * Fix ConcurrentModificationException during streaming (CASSANDRA-5782) 2.0.0-beta1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7220e3a/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index c61b404..aeb4419 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -620,8 +620,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe state(State.STREAMING); for (StreamTransferTask task : transfers.values()) { - if (task.getFileMessages().size() > 0) - handler.sendMessages(task.getFileMessages()); + Collection<FileMessage> messages = task.getFileMessages(); + if (messages.size() > 0) + handler.sendMessages(messages); else taskCompleted(task); // there is no file to send } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7220e3a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index ba2df03..956692d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -73,7 +73,9 @@ public class StreamTransferTask extends StreamTask public Collection<FileMessage> getFileMessages() { - return files.values(); + // We may race between queuing all those messages and the completion of the completion of + // the first ones. So copy the values to avoid a ConcurrentModificationException + return new ArrayList<>(files.values()); } public FileMessage createMessageForRetry(int sequenceNumber)