Updated Branches: refs/heads/trunk 3455f1b76 -> 3ec4ff5ed
change SimpleCondition.signal to UOE patch by Mikhail Mazursky; reviewed by jbellis for CASSANDRA-5691 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3ec4ff5e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3ec4ff5e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3ec4ff5e Branch: refs/heads/trunk Commit: 3ec4ff5ed3f362fc77f25c31ae16afbb46030624 Parents: 3455f1b Author: Jonathan Ellis <[email protected]> Authored: Wed Jun 26 07:55:28 2013 -0700 Committer: Jonathan Ellis <[email protected]> Committed: Wed Jun 26 07:55:28 2013 -0700 ---------------------------------------------------------------------- .../compaction/ParallelCompactionIterable.java | 26 +++++++++++++------- .../service/AbstractWriteResponseHandler.java | 2 +- .../apache/cassandra/service/ReadCallback.java | 2 +- .../service/TruncateResponseHandler.java | 2 +- .../apache/cassandra/utils/SimpleCondition.java | 5 ++-- 5 files changed, 22 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index 1d380f6..a8baf7a 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -195,7 +195,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable private class DeserializedColumnIterator implements OnDiskAtomIterator { private final Row row; - private Iterator<Column> iter; + private final Iterator<Column> iter; public DeserializedColumnIterator(Row row) { @@ -236,7 +236,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable { private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1); private static final RowContainer finished = new RowContainer((Row) null); - private Condition condition; private final ICompactionScanner scanner; public Deserializer(ICompactionScanner ssts, final int maxInMemorySize) @@ -246,11 +245,14 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable { protected void runMayThrow() throws Exception { + SimpleCondition condition = null; while (true) { if (condition != null) + { condition.await(); - + condition = null; + } if (!scanner.hasNext()) { queue.put(finished); @@ -260,13 +262,13 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable SSTableIdentityIterator iter = (SSTableIdentityIterator) scanner.next(); if (iter.dataSize > maxInMemorySize) { - logger.debug("parallel lazy deserialize from " + iter.getPath()); + logger.debug("parallel lazy deserialize from {}", iter.getPath()); condition = new SimpleCondition(); queue.put(new RowContainer(new NotifyingSSTableIdentityIterator(iter, condition))); } else { - logger.debug("parallel eager deserialize from " + iter.getPath()); + logger.debug("parallel eager deserialize from {}", iter.getPath()); queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory)))); } } @@ -301,9 +303,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable private static class NotifyingSSTableIdentityIterator implements OnDiskAtomIterator { private final SSTableIdentityIterator wrapped; - private final Condition condition; + private final SimpleCondition condition; - public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, Condition condition) + public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, SimpleCondition condition) { this.wrapped = wrapped; this.condition = condition; @@ -321,8 +323,14 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable public void close() throws IOException { - wrapped.close(); - condition.signal(); + try + { + wrapped.close(); + } + finally + { + condition.signalAll(); + } } public boolean hasNext() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 7be4c29..1740ee2 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -100,7 +100,7 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback protected void signal() { - condition.signal(); + condition.signalAll(); if (callback != null) callback.run(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 7cb5a23..bd8b025 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -117,7 +117,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag : received.get(); if (n >= blockfor && resolver.isDataPresent()) { - condition.signal(); + condition.signalAll(); maybeResolveForRepair(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/service/TruncateResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java index 3920b91..3bacad8 100644 --- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java +++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java @@ -70,7 +70,7 @@ public class TruncateResponseHandler implements IAsyncCallback { responses.incrementAndGet(); if (responses.get() >= responseCount) - condition.signal(); + condition.signalAll(); } public boolean isLatencyForSnitch() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/utils/SimpleCondition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/SimpleCondition.java b/src/java/org/apache/cassandra/utils/SimpleCondition.java index 6086f3b..4d5f896 100644 --- a/src/java/org/apache/cassandra/utils/SimpleCondition.java +++ b/src/java/org/apache/cassandra/utils/SimpleCondition.java @@ -51,10 +51,9 @@ public class SimpleCondition implements Condition return set; } - public synchronized void signal() + public void signal() { - set = true; - notify(); + throw new UnsupportedOperationException(); } public synchronized void signalAll()
