Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r158215083
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
    @@ -127,74 +141,130 @@ public void expire(Long key, TupleInfo tupleInfo) {
                 spoutObject.open(topoConf, taskData.getUserContext(), 
outputCollector);
             }
             openOrPrepareWasCalled.set(true);
    -        LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
    +        LOG.info("Opened spout {}:{}", componentId, taskIds);
             setupMetrics();
         }
     
         @Override
    -    public Callable<Object> call() throws Exception {
    -        init(idToTask);
    -
    -        return new Callable<Object>() {
    +    public Callable<Long> call() throws Exception {
    +        init(idToTask, idToTaskBase);
    +        return new Callable<Long>() {
    +            int i=0;
    +            final int recvqCheckSkipCount = getSpoutRecvqCheckSkipCount();
    +            int bpIdleCount = 0;
    +            int rmspCount = 0;
                 @Override
    -            public Object call() throws Exception {
    -                receiveQueue.consumeBatch(SpoutExecutor.this);
    -
    -                final long currCount = emittedCount.get();
    -                final boolean throttleOn = backPressureEnabled && 
SpoutExecutor.this.throttleOn.get();
    -                final boolean reachedMaxSpoutPending = (maxSpoutPending != 
0) && (pending.size() >= maxSpoutPending);
    -                final boolean isActive = stormActive.get();
    +            public Long call() throws Exception {
    +                int receiveCount = 0;
    +                if (i++ == recvqCheckSkipCount) {
    --- End diff --
    
    minor: we may be able to have longer but better variable name other than 
`i`, since it's not a temporary short scope variable.


---

Reply via email to