Repository: accumulo Updated Branches: refs/heads/master 51f39d292 -> 488f441f7
ACCUMULO-3745 simplify locking and added comments changes from [~elserj] CR on issue Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5ac1b52e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5ac1b52e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5ac1b52e Branch: refs/heads/master Commit: 5ac1b52efcd94b939773ef88f9f4f0bfa4fccaa2 Parents: 95e234c Author: Keith Turner <ke...@deenlo.com> Authored: Thu Apr 23 12:00:36 2015 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Fri Apr 24 19:09:37 2015 -0400 ---------------------------------------------------------------------- .../system/SourceSwitchingIterator.java | 58 +++++++++++--------- 1 file changed, 33 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5ac1b52e/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java index 7684352..ec73c27 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java @@ -19,7 +19,6 @@ package org.apache.accumulo.core.iterators.system; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -63,17 +62,28 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value private boolean onlySwitchAfterRow; + // Synchronization on copies synchronizes operations across all deep copies of this instance. + // + // This implementation assumes that there is one thread reading data (a scan) from all deep copies + // and that another thread may call switch at any point. A single scan may have multiple deep + // copies of this iterator if other iterators above this one duplicate their source. For example, + // if an IntersectingIterator over two columns was configured, `copies` would contain two SSIs + // instead of just one SSI. The two instances in `copies` would both be at the same "level" + // in the tree of iterators for the scan. If multiple instances of SSI are configure in the iterator + // tree (e.g. priority 8 and priority 12), each instance would share their own `copies` e.g. + // SSI@priority8:copies1[...], SSI@priority12:copies2[...] + private final List<SourceSwitchingIterator> copies; private SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow, List<SourceSwitchingIterator> copies) { this.source = source; this.onlySwitchAfterRow = onlySwitchAfterRow; this.copies = copies; + copies.add(this); } public SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow) { - this(source, onlySwitchAfterRow, Collections.synchronizedList(new ArrayList<SourceSwitchingIterator>())); - copies.add(this); + this(source, onlySwitchAfterRow, new ArrayList<SourceSwitchingIterator>()); } public SourceSwitchingIterator(DataSource source) { @@ -83,11 +93,7 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { synchronized (copies) { - synchronized(this){ - SourceSwitchingIterator ssi = new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow, copies); - copies.add(ssi); - return ssi; - } + return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow, copies); } } @@ -113,10 +119,12 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value @Override public void next() throws IOException { - readNext(false); + synchronized (copies) { + readNext(false); + } } - private synchronized void readNext(boolean initialSeek) throws IOException { + private void readNext(boolean initialSeek) throws IOException { // check of initialSeek second is intentional so that it does not short // circuit the call to switchSource @@ -162,18 +170,20 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value } @Override - public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - this.range = range; - this.inclusive = inclusive; - this.columnFamilies = columnFamilies; + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + synchronized (copies) { + this.range = range; + this.inclusive = inclusive; + this.columnFamilies = columnFamilies; - if (iter == null) - iter = source.iterator(); + if (iter == null) + iter = source.iterator(); - readNext(true); + readNext(true); + } } - private synchronized void _switchNow() throws IOException { + private void _switchNow() throws IOException { if (onlySwitchAfterRow) throw new IllegalStateException("Can only switch on row boundries"); @@ -194,15 +204,13 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value @Override public void setInterruptFlag(AtomicBoolean flag) { synchronized (copies) { - synchronized (this) { - if (copies.size() != 1) - throw new IllegalStateException("setInterruptFlag() called after deep copies made " + copies.size()); + if (copies.size() != 1) + throw new IllegalStateException("setInterruptFlag() called after deep copies made " + copies.size()); - if (iter != null) - ((InterruptibleIterator) iter).setInterruptFlag(flag); + if (iter != null) + ((InterruptibleIterator) iter).setInterruptFlag(flag); - source.setInterruptFlag(flag); - } + source.setInterruptFlag(flag); } } }