Updated Branches: refs/heads/master 0b8677579 -> 5d30af0c0
CRUNCH-183 Handle object reuse in reservoir sampling Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5d30af0c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5d30af0c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5d30af0c Branch: refs/heads/master Commit: 5d30af0c03a565894aa1cd7226ab13f90437548a Parents: 0b86775 Author: Gabriel Reid <[email protected]> Authored: Sun Mar 24 18:07:56 2013 +0100 Committer: Gabriel Reid <[email protected]> Committed: Sun Mar 24 20:17:01 2013 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Sample.java | 4 +- .../java/org/apache/crunch/lib/SampleUtils.java | 17 +++++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5d30af0c/crunch/src/main/java/org/apache/crunch/lib/Sample.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sample.java b/crunch/src/main/java/org/apache/crunch/lib/Sample.java index be75ae2..5a66101 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Sample.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Sample.java @@ -203,9 +203,9 @@ public class Sample { PTableType<Integer, Pair<Double, T>> ptt = ptf.tableOf(ptf.ints(), ptf.pairs(ptf.doubles(), ttype)); - return input.parallelDo(new ReservoirSampleFn<T, N>(sampleSizes, seed), ptt) + return input.parallelDo(new ReservoirSampleFn<T, N>(sampleSizes, seed, ttype), ptt) .groupByKey(1) - .combineValues(new WRSCombineFn<T>(sampleSizes)) + .combineValues(new WRSCombineFn<T>(sampleSizes, ttype)) .parallelDo(new MapFn<Pair<Integer, Pair<Double, T>>, Pair<Integer, T>>() { @Override public Pair<Integer, T> map(Pair<Integer, Pair<Double, T>> p) { http://git-wip-us.apache.org/repos/asf/crunch/blob/5d30af0c/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java b/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java index cbc30e4..d1cad35 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java +++ b/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java @@ -27,6 +27,7 @@ import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.FilterFn; import org.apache.crunch.Pair; +import org.apache.crunch.types.PType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -65,12 +66,14 @@ class SampleUtils { private int[] sampleSizes; private Long seed; + private PType<T> valueType; private transient List<SortedMap<Double, T>> reservoirs; private transient Random random; - public ReservoirSampleFn(int[] sampleSizes, Long seed) { + public ReservoirSampleFn(int[] sampleSizes, Long seed, PType<T> valueType) { this.sampleSizes = sampleSizes; this.seed = seed; + this.valueType = valueType; } @Override @@ -98,10 +101,10 @@ class SampleUtils { double score = Math.log(random.nextDouble()) / weight; SortedMap<Double, T> reservoir = reservoirs.get(id); if (reservoir.size() < sampleSizes[id]) { - reservoir.put(score, p.first()); + reservoir.put(score, valueType.getDetachedValue(p.first())); } else if (score > reservoir.firstKey()) { reservoir.remove(reservoir.firstKey()); - reservoir.put(score, p.first()); + reservoir.put(score, valueType.getDetachedValue(p.first())); } } } @@ -120,10 +123,12 @@ class SampleUtils { static class WRSCombineFn<T> extends CombineFn<Integer, Pair<Double, T>> { private int[] sampleSizes; + private PType<T> valueType; private List<SortedMap<Double, T>> reservoirs; - public WRSCombineFn(int[] sampleSizes) { + public WRSCombineFn(int[] sampleSizes, PType<T> valueType) { this.sampleSizes = sampleSizes; + this.valueType = valueType; } @Override @@ -140,10 +145,10 @@ class SampleUtils { SortedMap<Double, T> reservoir = reservoirs.get(input.first()); for (Pair<Double, T> p : input.second()) { if (reservoir.size() < sampleSizes[input.first()]) { - reservoir.put(p.first(), p.second()); + reservoir.put(p.first(), valueType.getDetachedValue(p.second())); } else if (p.first() > reservoir.firstKey()) { reservoir.remove(reservoir.firstKey()); - reservoir.put(p.first(), p.second()); + reservoir.put(p.first(), valueType.getDetachedValue(p.second())); } } }
