Updated Branches:
  refs/heads/trunk abea32e19 -> b7a016d99

Fix ConcurrentModificationException during streaming

patch by slebresne; reviewed by yukim for CASSANDRA-5782


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7220e3a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7220e3a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7220e3a

Branch: refs/heads/trunk
Commit: c7220e3a7b8debd8fa348ad258bc0ae8a3cd8509
Parents: abea32e
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Mon Jul 22 15:43:08 2013 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Mon Jul 22 15:43:08 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                                     | 1 +
 src/java/org/apache/cassandra/streaming/StreamSession.java      | 5 +++--
 src/java/org/apache/cassandra/streaming/StreamTransferTask.java | 4 +++-
 3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7220e3a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbf2773..4d56c98 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,7 @@
  * CAS on 'primary key only' table (CASSANDRA-5715)
  * Support streaming SSTables of old versions (CASSANDRA-5772)
  * Always respect protocol version in native protocol (CASSANDRA-5778)
+ * Fix ConcurrentModificationException during streaming (CASSANDRA-5782)
 
 
 2.0.0-beta1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7220e3a/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c61b404..aeb4419 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -620,8 +620,9 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
         state(State.STREAMING);
         for (StreamTransferTask task : transfers.values())
         {
-            if (task.getFileMessages().size() > 0)
-                handler.sendMessages(task.getFileMessages());
+            Collection<FileMessage> messages = task.getFileMessages();
+            if (messages.size() > 0)
+                handler.sendMessages(messages);
             else
                 taskCompleted(task); // there is no file to send
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7220e3a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java 
b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index ba2df03..956692d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -73,7 +73,9 @@ public class StreamTransferTask extends StreamTask
 
     public Collection<FileMessage> getFileMessages()
     {
-        return files.values();
+        // We may race between queuing all those messages and the completion 
of the completion of
+        // the first ones. So copy the values to avoid a 
ConcurrentModificationException
+        return new ArrayList<>(files.values());
     }
 
     public FileMessage createMessageForRetry(int sequenceNumber)

Reply via email to