Updated Branches: refs/heads/master 8d1886273 -> 146f1e505
CRUNCH-236 Set context on wrapped MapFn in OneToManyJoin Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/146f1e50 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/146f1e50 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/146f1e50 Branch: refs/heads/master Commit: 146f1e5055114170957f50487a985737c036ea51 Parents: 8d18862 Author: Gabriel Reid <[email protected]> Authored: Thu Jul 11 10:52:40 2013 +0200 Committer: Gabriel Reid <[email protected]> Committed: Thu Jul 11 10:52:40 2013 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/join/OneToManyJoin.java | 7 +++++++ .../java/org/apache/crunch/lib/join/OneToManyJoinTest.java | 1 + 2 files changed, 8 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/146f1e50/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java index 7e92738..25556ec 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java @@ -29,6 +29,7 @@ import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.lib.Join; import org.apache.crunch.types.PType; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import com.google.common.base.Function; import com.google.common.collect.Iterables; @@ -91,6 +92,12 @@ public class OneToManyJoin { postProcessFn.initialize(); leftValueType.initialize(getConfiguration()); } + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + super.setContext(context); + postProcessFn.setContext(context); + } @Override public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input, Emitter<T> emitter) { http://git-wip-us.apache.org/repos/asf/crunch/blob/146f1e50/crunch-core/src/test/java/org/apache/crunch/lib/join/OneToManyJoinTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/join/OneToManyJoinTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/join/OneToManyJoinTest.java index dad4c35..e38fd37 100644 --- a/crunch-core/src/test/java/org/apache/crunch/lib/join/OneToManyJoinTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/lib/join/OneToManyJoinTest.java @@ -99,6 +99,7 @@ public class OneToManyJoinTest { @Override public Pair<String, String> map(Pair<String, Iterable<String>> input) { + increment("counters", "inputcount"); return Pair.of(input.first(), Joiner.on(',').join(input.second())); }
