[ 
https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14183698#comment-14183698
 ] 

ASF GitHub Bot commented on STORM-378:
--------------------------------------

Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/295#issuecomment-60460916
  
    @caofangkun 
    It's mk-threads in executor.clj.
    
    ```
                (if (and (= curr-count (.get emitted-count)) active?)
                  (do (.increment empty-emit-streak)
                      (.emptyEmit spout-wait-strategy (.get empty-emit-streak)))
                  (.set empty-emit-streak 0)
                  ))           
    ```
    
    You can find that streak gets increased by 1, so I think it's for 
alternative implementation of ISpoutWaitStrategy, not SleepSpoutWaitStrategy.
    (@nathanmarz Could you please confirm it?)
    If it is for, just adding it to sleepMillis barely affects sleep time.
    Streak should be multiplied by 10 or something bigger, maybe we can apply 
exponential value of already multiplied streak.


> SleepSpoutWaitStrategy.emptyEmit should use  the variable "streak"
> ------------------------------------------------------------------
>
>                 Key: STORM-378
>                 URL: https://issues.apache.org/jira/browse/STORM-378
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 0.9.2-incubating
>            Reporter: caofangkun
>            Priority: Minor
>
> {code:java}
> Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
> ===================================================================
> --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (revision 2868)
> +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (working copy)
> @@ -18,6 +18,8 @@
>  package backtype.storm.spout;
>  
>  import backtype.storm.Config;
> +import backtype.storm.utils.Utils;
> +
>  import java.util.Map;
>  
>  
> @@ -27,13 +29,14 @@
>      
>      @Override
>      public void prepare(Map conf) {
> -        sleepMillis = ((Number) 
> conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
> +        sleepMillis = Utils.getLong(
> +            conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 
> 500);
>      }
>  
>      @Override
>      public void emptyEmit(long streak) {
>          try {
> -            Thread.sleep(sleepMillis);
> +            Thread.sleep(Math.abs(sleepMillis + streak));
>          } catch (InterruptedException e) {
>              throw new RuntimeException(e);
>          }
> Index: src/jvm/backtype/storm/utils/Utils.java
> ===================================================================
> --- src/jvm/backtype/storm/utils/Utils.java   (revision 2888)
> +++ src/jvm/backtype/storm/utils/Utils.java   (working copy)
> @@ -325,6 +325,24 @@
>            throw new IllegalArgumentException("Don't know how to convert " + 
> o + " + to int");
>        }
>      }
> +    
> +    public static Long getLong(Object o, long defaultValue) {
> +
> +      if (o == null) {
> +        return defaultValue;
> +      }
> +
> +      if (o instanceof String) {
> +        return Long.valueOf(String.valueOf(o));
> +      } else if (o instanceof Integer) {
> +        Integer value = (Integer) o;
> +        return Long.valueOf((Integer) value);
> +      } else if (o instanceof Long) {
> +        return (Long) o;
> +      } else {
> +        return defaultValue;
> +      }
> +    }
>  
>      public static boolean getBoolean(Object o, boolean defaultValue) {
>        if (null == o) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to