Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 561293d13 -> ddca610c9 refs/heads/trunk 6041d41cd -> 0a80fe4b5
Fix race condition in StreamTransferTask that could lead to infinite loops and premature sstable deletion patch by benedict; reviewed by yukim for CASSANDRA-7704 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ddca610c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ddca610c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ddca610c Branch: refs/heads/cassandra-2.1 Commit: ddca610c9c82e9bf527d4f57a814af06ed2a7cb3 Parents: 561293d Author: Benedict Elliott Smith <bened...@apache.org> Authored: Wed Jan 7 19:44:00 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Wed Jan 7 20:03:17 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cassandra/streaming/StreamTransferTask.java | 83 +++++++++++--------- .../streaming/StreamTransferTaskTest.java | 17 +++- 3 files changed, 63 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddca610c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dfed732..dac555b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -50,6 +50,8 @@ * Log failed host when preparing incremental repair (CASSANDRA-8228) * Force config client mode in CQLSSTableWriter (CASSANDRA-8281) Merged from 2.0: + * Fix race condition in StreamTransferTask that could lead to + infinite loops and premature sstable deletion (CASSANDRA-7704) * Add an extra version check to MigrationTask (CASSANDRA-8462) * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499) * Increase bf true positive count on key cache hit (CASSANDRA-8525) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddca610c/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 48a7d89..b840ee5 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -19,9 +19,10 @@ package org.apache.cassandra.streaming; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.Pair; @@ -31,14 +32,13 @@ import org.apache.cassandra.utils.Pair; */ public class StreamTransferTask extends StreamTask { - private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts")); private final AtomicInteger sequenceNumber = new AtomicInteger(0); - private AtomicBoolean aborted = new AtomicBoolean(false); + private boolean aborted = false; - private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>(); - - private final Map<Integer, ScheduledFuture> timeoutTasks = new ConcurrentHashMap<>(); + private final Map<Integer, OutgoingFileMessage> files = new HashMap<>(); + private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>(); private long totalSize; @@ -47,7 +47,7 @@ public class StreamTransferTask extends StreamTask super(session, cfId); } - public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) + public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) { assert sstable != null && cfId.equals(sstable.metadata.cfId); OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt); @@ -60,35 +60,42 @@ public class StreamTransferTask extends StreamTask * * @param sequenceNumber sequence number of file */ - public synchronized void complete(int sequenceNumber) + public void complete(int sequenceNumber) { - OutgoingFileMessage file = files.remove(sequenceNumber); - if (file != null) + boolean signalComplete; + synchronized (this) { - file.sstable.releaseReference(); - // all file sent, notify session this task is complete. - if (files.isEmpty()) - { - timeoutExecutor.shutdownNow(); - session.taskCompleted(this); - } + ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber); + if (timeout != null) + timeout.cancel(false); + + OutgoingFileMessage file = files.remove(sequenceNumber); + if (file != null) + file.sstable.releaseReference(); + + signalComplete = files.isEmpty(); } + + // all file sent, notify session this task is complete. + if (signalComplete) + session.taskCompleted(this); } - public void abort() + public synchronized void abort() { - // Prevent releasing reference multiple times - if (aborted.compareAndSet(false, true)) - { - for (OutgoingFileMessage file : files.values()) - { - file.sstable.releaseReference(); - } - timeoutExecutor.shutdownNow(); - } + if (aborted) + return; + aborted = true; + + for (ScheduledFuture future : timeoutTasks.values()) + future.cancel(false); + timeoutTasks.clear(); + + for (OutgoingFileMessage file : files.values()) + file.sstable.releaseReference(); } - public int getTotalNumberOfFiles() + public synchronized int getTotalNumberOfFiles() { return files.size(); } @@ -98,17 +105,17 @@ public class StreamTransferTask extends StreamTask return totalSize; } - public Collection<OutgoingFileMessage> getFileMessages() + public synchronized Collection<OutgoingFileMessage> getFileMessages() { // We may race between queuing all those messages and the completion of the completion of - // the first ones. So copy the values to avoid a ConcurrentModificationException + // the first ones. So copy tthe values to avoid a ConcurrentModificationException return new ArrayList<>(files.values()); } public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber) { // remove previous time out task to be rescheduled later - ScheduledFuture future = timeoutTasks.get(sequenceNumber); + ScheduledFuture future = timeoutTasks.remove(sequenceNumber); if (future != null) future.cancel(false); return files.get(sequenceNumber); @@ -126,18 +133,24 @@ public class StreamTransferTask extends StreamTask */ public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit) { - if (timeoutExecutor.isShutdown()) + if (!files.containsKey(sequenceNumber)) return null; ScheduledFuture future = timeoutExecutor.schedule(new Runnable() { public void run() { - StreamTransferTask.this.complete(sequenceNumber); - timeoutTasks.remove(sequenceNumber); + synchronized (StreamTransferTask.this) + { + // remove so we don't cancel ourselves + timeoutTasks.remove(sequenceNumber); + StreamTransferTask.this.complete(sequenceNumber); + } } }, time, unit); - timeoutTasks.put(sequenceNumber, future); + + ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future); + assert prev == null; return future; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddca610c/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 3e73b24..a528f10 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -20,11 +20,14 @@ package org.apache.cassandra.streaming; import java.net.InetAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.junit.Test; +import junit.framework.Assert; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -66,14 +69,20 @@ public class StreamTransferTaskTest extends SchemaLoader assertEquals(2, task.getTotalNumberOfFiles()); // if file sending completes before timeout then the task should be canceled. - ScheduledFuture f = task.scheduleTimeout(0, 1, TimeUnit.SECONDS); - task.complete(0); - // timeout task may run after complete but it is noop + Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS); f.get(); // when timeout runs on second file, task should be completed f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS); - f.get(); + task.complete(1); + try + { + f.get(); + Assert.assertTrue(false); + } + catch (CancellationException ex) + { + } assertEquals(StreamSession.State.WAIT_COMPLETE, session.state()); // when all streaming are done, time out task should not be scheduled.