Updated Branches: refs/heads/apache-crunch-0.8 deca72853 -> 243d2ff27
CRUNCH-332: Ensure DoFn.configure is called prior to DoFn.setContext and DoFn.initialize in the in-memory impl Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/243d2ff2 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/243d2ff2 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/243d2ff2 Branch: refs/heads/apache-crunch-0.8 Commit: 243d2ff27e02b07f4b1a9297080fdb34b66b0b88 Parents: deca728 Author: Josh Wills <[email protected]> Authored: Fri Jan 24 08:31:10 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Jan 24 08:51:02 2014 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/impl/mem/collect/MemCollection.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/243d2ff2/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index bbcdc0b..81433eb 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -112,7 +112,9 @@ public class MemCollection<S> implements PCollection<S> { public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type, ParallelDoOptions options) { InMemoryEmitter<T> emitter = new InMemoryEmitter<T>(); - doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); + Configuration conf = getPipeline().getConfiguration(); + doFn.configure(conf); + doFn.setContext(getInMemoryContext(conf)); doFn.initialize(); for (S s : collect) { doFn.process(s, emitter); @@ -135,7 +137,9 @@ public class MemCollection<S> implements PCollection<S> { public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type, ParallelDoOptions options) { InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>(); - doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); + Configuration conf = getPipeline().getConfiguration(); + doFn.configure(conf); + doFn.setContext(getInMemoryContext(conf)); doFn.initialize(); for (S s : collect) { doFn.process(s, emitter);
