Repository: crunch Updated Branches: refs/heads/master 4e1cc9ede -> 050b4a9e1
CRUNCH-499: Propogate context to wrapped DoFn in DoFns.detach() When wrapping a DoFn in DoFns.detach(), the context does not get passed down to the enclosing DoFn when setContext is invoked, meaning counter incrementing and other features which rely on the context being available don't work as expected. This patch makes setContext change the context of both the wrapper and the delegate DoFn. Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/050b4a9e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/050b4a9e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/050b4a9e Branch: refs/heads/master Commit: 050b4a9e18f25cf9615dc07be21c5389624f58c9 Parents: 4e1cc9e Author: David Whiting <[email protected]> Authored: Tue Feb 10 15:34:28 2015 +0100 Committer: Josh Wills <[email protected]> Committed: Tue Feb 10 08:12:38 2015 -0800 ---------------------------------------------------------------------- crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java | 7 +++++++ 1 file changed, 7 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/050b4a9e/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java b/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java index cbf819f..d962344 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java @@ -24,6 +24,7 @@ import org.apache.crunch.Emitter; import org.apache.crunch.Pair; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import java.io.Serializable; @@ -78,6 +79,12 @@ public class DoFns { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + super.setContext(context); + reduceFn.setContext(context); + } + + @Override public void configure(Configuration configuration) { super.configure(configuration); reduceFn.configure(configuration);
