Merge branch 'task' of https://github.com/abhishekagarwal87/storm into STORM-1271
STORM-1271: Port backtype.storm.daemon.task to java Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/31d558ca Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/31d558ca Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/31d558ca Branch: refs/heads/master Commit: 31d558cad5c8ee99f0e3540203cfa7e04341d6f8 Parents: 223b615 7d63cb3 Author: Robert (Bobby) Evans <[email protected]> Authored: Thu Mar 31 14:27:36 2016 -0500 Committer: Robert (Bobby) Evans <[email protected]> Committed: Thu Mar 31 14:27:36 2016 -0500 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/config.clj | 27 -- .../clj/org/apache/storm/daemon/executor.clj | 222 +++++++---------- .../src/clj/org/apache/storm/daemon/task.clj | 190 -------------- .../org/apache/storm/daemon/GrouperFactory.java | 243 ++++++++++++++++++ .../src/jvm/org/apache/storm/daemon/Task.java | 247 +++++++++++++++++++ .../daemon/metrics/BuiltinMetricsUtil.java | 8 +- .../apache/storm/hooks/info/BoltAckInfo.java | 8 + .../storm/hooks/info/BoltExecuteInfo.java | 8 + .../apache/storm/hooks/info/BoltFailInfo.java | 8 + .../org/apache/storm/hooks/info/EmitInfo.java | 9 + .../apache/storm/hooks/info/SpoutAckInfo.java | 9 + .../apache/storm/hooks/info/SpoutFailInfo.java | 9 + .../jvm/org/apache/storm/utils/ConfigUtils.java | 35 ++- .../test/clj/org/apache/storm/grouping_test.clj | 19 +- 14 files changed, 675 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/31d558ca/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java index d5f80db,30d314f..0f53343 --- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java @@@ -135,9 -137,30 +137,30 @@@ public class ConfigUtils throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate); } - // public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we need to do after util + public static Callable<Boolean> evenSampler(final int samplingFreq) { + final Random random = new Random(); + + return new Callable<Boolean>() { + private int curr = -1; + private int target = random.nextInt(samplingFreq); + + @Override + public Boolean call() throws Exception { + curr++; + if (curr >= samplingFreq) { + curr = 0; + target = random.nextInt(samplingFreq); + } + return (curr == target); + } + }; + } + + public static Callable<Boolean> mkStatsSampler(Map conf) { + return evenSampler(samplingRate(conf)); + } - // we use this "wired" wrapper pattern temporarily for mocking in clojure test + // we use this "weird" wrapper pattern temporarily for mocking in clojure test public static Map readStormConfig() { return _instance.readStormConfigImpl(); }
