Updated Branches: refs/heads/master 018d16984 -> e8b9d4b2a
CRUNCH-142 Call delegate setContext and configure Call the delegate setContext and configure methods from FilterFn decorators (And, Or, Not). Contributed by Dave Beech. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/e8b9d4b2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/e8b9d4b2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/e8b9d4b2 Branch: refs/heads/master Commit: e8b9d4b2aa651503aa03a5a75f13aa6e6f9347f7 Parents: 018d169 Author: Gabriel Reid <[email protected]> Authored: Mon Jan 14 16:20:54 2013 +0100 Committer: Gabriel Reid <[email protected]> Committed: Mon Jan 14 16:47:15 2013 +0100 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/FilterFn.java | 43 +++++++++++++++ 1 files changed, 43 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/e8b9d4b2/crunch/src/main/java/org/apache/crunch/FilterFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java b/crunch/src/main/java/org/apache/crunch/FilterFn.java index 467400f..010afed 100644 --- a/crunch/src/main/java/org/apache/crunch/FilterFn.java +++ b/crunch/src/main/java/org/apache/crunch/FilterFn.java @@ -20,6 +20,8 @@ package org.apache.crunch; import java.util.List; import org.apache.crunch.fn.FilterFns; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import com.google.common.collect.ImmutableList; @@ -63,6 +65,21 @@ public abstract class FilterFn<T> extends DoFn<T, T> { public AndFn(FilterFn<S>... fns) { this.fns = ImmutableList.<FilterFn<S>> copyOf(fns); } + + @Override + public void configure(Configuration conf) { + for (FilterFn<S> fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (FilterFn<S> fn : fns) { + fn.setContext(context); + } + initialize(); + } @Override public boolean accept(S input) { @@ -101,6 +118,21 @@ public abstract class FilterFn<T> extends DoFn<T, T> { public OrFn(FilterFn<S>... fns) { this.fns = ImmutableList.<FilterFn<S>> copyOf(fns); } + + @Override + public void configure(Configuration conf) { + for (FilterFn<S> fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (FilterFn<S> fn : fns) { + fn.setContext(context); + } + initialize(); + } @Override public boolean accept(S input) { @@ -139,8 +171,19 @@ public abstract class FilterFn<T> extends DoFn<T, T> { public NotFn(FilterFn<S> base) { this.base = base; } + + @Override + public void configure(Configuration conf) { + base.configure(conf); + } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + base.setContext(context); + initialize(); + } + + @Override public boolean accept(S input) { return !base.accept(input); }
