Updated Branches: refs/heads/master 64497fa4f -> 3e513cfab
CRUNCH-191 Detached retained values in Distinct Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3e513cfa Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3e513cfa Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3e513cfa Branch: refs/heads/master Commit: 3e513cfabf7d37321c868ea8007aa3c9d202e644 Parents: 64497fa Author: Gabriel Reid <[email protected]> Authored: Fri Apr 5 21:32:31 2013 +0200 Committer: Gabriel Reid <[email protected]> Committed: Fri Apr 5 21:32:31 2013 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Distinct.java | 14 +++++++++++--- 1 files changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/3e513cfa/crunch/src/main/java/org/apache/crunch/lib/Distinct.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java index 533f3fb..994830d 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java @@ -69,7 +69,7 @@ public final class Distinct { PType<S> pt = input.getPType(); PTypeFamily ptf = pt.getFamily(); return input - .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery), ptf.tableOf(pt, ptf.nulls())) + .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery, pt), ptf.tableOf(pt, ptf.nulls())) .groupByKey() .parallelDo("post-distinct", new PostDistinctFn<S>(), pt); } @@ -84,14 +84,22 @@ public final class Distinct { private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> { private final Set<S> values = Sets.newHashSet(); private final int flushEvery; + private final PType<S> ptype; - public PreDistinctFn(int flushEvery) { + public PreDistinctFn(int flushEvery, PType<S> ptype) { this.flushEvery = flushEvery; + this.ptype = ptype; + } + + @Override + public void initialize() { + super.initialize(); + ptype.initialize(getConfiguration()); } @Override public void process(S input, Emitter<Pair<S, Void>> emitter) { - values.add(input); + values.add(ptype.getDetachedValue(input)); if (values.size() > flushEvery) { cleanup(emitter); }
