Repository: cassandra Updated Branches: refs/heads/trunk ba22af17c -> fe0572778
Fix race condition in StreamTransferTask that could lead to infinite loops and premature sstable deletion patch by benedict; reviewed by yukim for CASSANDRA-7704 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8e42d5fe Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8e42d5fe Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8e42d5fe Branch: refs/heads/trunk Commit: 8e42d5fe86f6b2f4dd636ed77793d6ab69792895 Parents: 115bbe4 Author: Benedict Elliott Smith <[email protected]> Authored: Sat Aug 16 12:11:45 2014 +0700 Committer: Benedict Elliott Smith <[email protected]> Committed: Sat Aug 16 12:11:45 2014 +0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cassandra/streaming/StreamTransferTask.java | 75 +++++++++++++------- .../streaming/StreamTransferTaskTest.java | 21 ++++-- 3 files changed, 65 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e42d5fe/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 987c227..cf4a115 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.0.10 + * Fix race condition in StreamTransferTask that could lead to + infinite loops and premature sstable deletion (CASSANDRA-7704) * (cqlsh) Wait up to 10 sec for a tracing session (CASSANDRA-7222) * Fix NPE in FileCacheService.sizeInBytes (CASSANDRA-7756) * (cqlsh) cqlsh should automatically disable tracing when selecting http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e42d5fe/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index a543d01..629c6bb 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -19,8 +19,12 @@ package org.apache.cassandra.streaming; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; +import io.netty.util.concurrent.*; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.Pair; @@ -30,13 +34,13 @@ import org.apache.cassandra.utils.Pair; */ public class StreamTransferTask extends StreamTask { - private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts")); private final AtomicInteger sequenceNumber = new AtomicInteger(0); + private boolean aborted = false; - private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>(); - - private final Map<Integer, ScheduledFuture> timeoutTasks = new ConcurrentHashMap<>(); + private final Map<Integer, OutgoingFileMessage> files = new HashMap<>(); + private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>(); private long totalSize; @@ -45,7 +49,7 @@ public class StreamTransferTask extends StreamTask super(session, cfId); } - public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections) + public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections) { assert sstable != null && cfId.equals(sstable.metadata.cfId); OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections); @@ -58,31 +62,42 @@ public class StreamTransferTask extends StreamTask * * @param sequenceNumber sequence number of file */ - public synchronized void complete(int sequenceNumber) + public void complete(int sequenceNumber) { - OutgoingFileMessage file = files.remove(sequenceNumber); - if (file != null) + boolean signalComplete; + synchronized (this) { - file.sstable.releaseReference(); - // all file sent, notify session this task is complete. - if (files.isEmpty()) - { - timeoutExecutor.shutdownNow(); - session.taskCompleted(this); - } + ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber); + if (timeout != null) + timeout.cancel(false); + + OutgoingFileMessage file = files.remove(sequenceNumber); + if (file != null) + file.sstable.releaseReference(); + + signalComplete = files.isEmpty(); } + + // all file sent, notify session this task is complete. + if (signalComplete) + session.taskCompleted(this); } - public void abort() + public synchronized void abort() { + if (aborted) + return; + aborted = true; + + for (ScheduledFuture future : timeoutTasks.values()) + future.cancel(false); + timeoutTasks.clear(); + for (OutgoingFileMessage file : files.values()) - { file.sstable.releaseReference(); - } - timeoutExecutor.shutdownNow(); } - public int getTotalNumberOfFiles() + public synchronized int getTotalNumberOfFiles() { return files.size(); } @@ -92,17 +107,17 @@ public class StreamTransferTask extends StreamTask return totalSize; } - public Collection<OutgoingFileMessage> getFileMessages() + public synchronized Collection<OutgoingFileMessage> getFileMessages() { // We may race between queuing all those messages and the completion of the completion of - // the first ones. So copy the values to avoid a ConcurrentModificationException + // the first ones. So copy tthe values to avoid a ConcurrentModificationException return new ArrayList<>(files.values()); } public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber) { // remove previous time out task to be rescheduled later - ScheduledFuture future = timeoutTasks.get(sequenceNumber); + ScheduledFuture future = timeoutTasks.remove(sequenceNumber); if (future != null) future.cancel(false); return files.get(sequenceNumber); @@ -120,18 +135,24 @@ public class StreamTransferTask extends StreamTask */ public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit) { - if (timeoutExecutor.isShutdown()) + if (!files.containsKey(sequenceNumber)) return null; ScheduledFuture future = timeoutExecutor.schedule(new Runnable() { public void run() { - StreamTransferTask.this.complete(sequenceNumber); - timeoutTasks.remove(sequenceNumber); + synchronized (StreamTransferTask.this) + { + // remove so we don't cancel ourselves + timeoutTasks.remove(sequenceNumber); + StreamTransferTask.this.complete(sequenceNumber); + } } }, time, unit); - timeoutTasks.put(sequenceNumber, future); + + ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future); + assert prev == null; return future; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e42d5fe/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index ce0f9d0..cc41a8b 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -19,11 +19,14 @@ package org.apache.cassandra.streaming; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.junit.Test; +import junit.framework.Assert; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -43,7 +46,7 @@ public class StreamTransferTaskTest extends SchemaLoader String ks = "Keyspace1"; String cf = "Standard1"; - StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null); + StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null, 0); ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf); // create two sstables @@ -59,19 +62,25 @@ public class StreamTransferTaskTest extends SchemaLoader { List<Range<Token>> ranges = new ArrayList<>(); ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); - task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges)); + task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges), 0); } assertEquals(2, task.getTotalNumberOfFiles()); // if file sending completes before timeout then the task should be canceled. - ScheduledFuture f = task.scheduleTimeout(0, 1, TimeUnit.SECONDS); - task.complete(0); - // timeout task may run after complete but it is noop + Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS); f.get(); // when timeout runs on second file, task should be completed f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS); - f.get(); + task.complete(1); + try + { + f.get(); + Assert.assertTrue(false); + } + catch (CancellationException ex) + { + } assertEquals(StreamSession.State.WAIT_COMPLETE, session.state()); // when all streaming are done, time out task should not be scheduled.
