Repository: crunch Updated Branches: refs/heads/master f12eab83e -> 3740f3b1b
CRUNCH-471: Make CacheEmitter methods inside of DoFnIterator synchronized. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3740f3b1 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3740f3b1 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3740f3b1 Branch: refs/heads/master Commit: 3740f3b1b955d1258cc5f1105f1fe97b8109002c Parents: f12eab8 Author: Josh Wills <[email protected]> Authored: Wed Sep 24 14:11:38 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Wed Sep 24 14:11:38 2014 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/util/DoFnIterator.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/3740f3b1/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java b/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java index 0877a8f..c35838e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java +++ b/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java @@ -21,8 +21,10 @@ import com.google.common.collect.Lists; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; /** * An {@code Iterator<T>} that combines a delegate {@code Iterator<S>} and a {@code DoFn<S, T>}, generating @@ -77,16 +79,16 @@ public class DoFnIterator<S, T> implements Iterator<T> { this.cache = Lists.newLinkedList(); } - public boolean isEmpty() { + public synchronized boolean isEmpty() { return cache.isEmpty(); } - public T poll() { + public synchronized T poll() { return cache.poll(); } @Override - public void emit(T emitted) { + public synchronized void emit(T emitted) { cache.add(emitted); }
