Made Sampler operator simpler to use.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/e1392b05 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/e1392b05 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/e1392b05 Branch: refs/heads/master Commit: e1392b0540a4c29ffeee1c16f8e6b7478adf94da Parents: dd15161 Author: Timothy Farkas <[email protected]> Authored: Mon Jul 27 12:53:58 2015 -0700 Committer: thomas <[email protected]> Committed: Mon Jul 27 20:23:38 2015 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/lib/algo/Sampler.java | 56 +++++++------------- .../com/datatorrent/lib/algo/SamplerTest.java | 3 +- 2 files changed, 19 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e1392b05/library/src/main/java/com/datatorrent/lib/algo/Sampler.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/Sampler.java b/library/src/main/java/com/datatorrent/lib/algo/Sampler.java index b91f6b6..8eb7103 100644 --- a/library/src/main/java/com/datatorrent/lib/algo/Sampler.java +++ b/library/src/main/java/com/datatorrent/lib/algo/Sampler.java @@ -17,6 +17,7 @@ package com.datatorrent.lib.algo; import java.util.Random; +import javax.validation.constraints.Max; import javax.validation.constraints.Min; import com.datatorrent.api.DefaultInputPort; @@ -55,7 +56,7 @@ import com.datatorrent.lib.util.BaseKeyOperator; * </p> * * @displayName Sampler - * @category Algorithmic + * @category Stats and Aggregations * @tags filter * * @since 0.3.2 @@ -70,13 +71,13 @@ public class Sampler<K> extends BaseKeyOperator<K> public final transient DefaultInputPort<K> data = new DefaultInputPort<K>() { /** - * Emits the tuple as per probability of passrate out of totalrate + * Emits tuples at a rate corresponding to the given samplingPercentage. */ @Override public void process(K tuple) { - int fval = random.nextInt(totalrate); - if (fval >= passrate) { + double val = random.nextDouble(); + if (val > samplingPercentage) { return; } sample.emit(cloneKey(tuple)); @@ -88,49 +89,28 @@ public class Sampler<K> extends BaseKeyOperator<K> */ public final transient DefaultOutputPort<K> sample = new DefaultOutputPort<K>(); - @Min(1) - int passrate = 1; - @Min(1) - int totalrate = 100; - private transient Random random = new Random(); - - /** - * getter function for pass rate - * @return passrate - */ - @Min(1) - public int getPassrate() - { - return passrate; - } + @Min(0) + @Max(1) + private double samplingPercentage = 1.0; - /** - * getter function for total rate - * @return totalrate - */ - @Min(1) - public int getTotalrate() - { - return totalrate; - } + private transient Random random = new Random(); /** - * Sets pass rate - * - * @param val passrate is set to val + * Gets the samplingPercentage. + * @return the samplingPercentage */ - public void setPassrate(int val) + public double getSamplingPercentage() { - passrate = val; + return samplingPercentage; } /** - * Sets total rate - * - * @param val total rate is set to val + * The percentage of tuples to allow to pass through this operator. This percentage should be + * a number between 0 and 1 inclusive. + * @param samplingPercentage the samplingPercentage to set */ - public void setTotalrate(int val) + public void setSamplingPercentage(double samplingPercentage) { - totalrate = val; + this.samplingPercentage = samplingPercentage; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e1392b05/library/src/test/java/com/datatorrent/lib/algo/SamplerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/algo/SamplerTest.java b/library/src/test/java/com/datatorrent/lib/algo/SamplerTest.java index ed21339..5a8c71a 100644 --- a/library/src/test/java/com/datatorrent/lib/algo/SamplerTest.java +++ b/library/src/test/java/com/datatorrent/lib/algo/SamplerTest.java @@ -38,8 +38,7 @@ public class SamplerTest Sampler<String> oper = new Sampler<String>(); CountTestSink sink = new CountTestSink<String>(); oper.sample.setSink(sink); - oper.setPassrate(10); - oper.setTotalrate(100); + oper.setSamplingPercentage(.1); String tuple = "a";
