Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r158215973
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
    @@ -127,74 +141,130 @@ public void expire(Long key, TupleInfo tupleInfo) {
                 spoutObject.open(topoConf, taskData.getUserContext(), 
outputCollector);
             }
             openOrPrepareWasCalled.set(true);
    -        LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
    +        LOG.info("Opened spout {}:{}", componentId, taskIds);
             setupMetrics();
         }
     
         @Override
    -    public Callable<Object> call() throws Exception {
    -        init(idToTask);
    -
    -        return new Callable<Object>() {
    +    public Callable<Long> call() throws Exception {
    +        init(idToTask, idToTaskBase);
    +        return new Callable<Long>() {
    +            int i=0;
    +            final int recvqCheckSkipCount = getSpoutRecvqCheckSkipCount();
    +            int bpIdleCount = 0;
    +            int rmspCount = 0;
                 @Override
    -            public Object call() throws Exception {
    -                receiveQueue.consumeBatch(SpoutExecutor.this);
    -
    -                final long currCount = emittedCount.get();
    -                final boolean throttleOn = backPressureEnabled && 
SpoutExecutor.this.throttleOn.get();
    -                final boolean reachedMaxSpoutPending = (maxSpoutPending != 
0) && (pending.size() >= maxSpoutPending);
    -                final boolean isActive = stormActive.get();
    +            public Long call() throws Exception {
    +                int receiveCount = 0;
    +                if (i++ == recvqCheckSkipCount) {
    +                    receiveCount = 
receiveQueue.consume(SpoutExecutor.this);
    +                    i=0;
    +                }
    +                long currCount = emittedCount.get();
    +                boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && 
(pending.size() >= maxSpoutPending);
    +                boolean isActive = stormActive.get();
    +                boolean noEmits = true;
                     if (isActive) {
                         if (!lastActive.get()) {
                             lastActive.set(true);
    -                        LOG.info("Activating spout {}:{}", componentId, 
idToTask.keySet());
    +                        LOG.info("Activating spout {}:{}", componentId, 
taskIds);
                             for (ISpout spout : spouts) {
                                 spout.activate();
                             }
                         }
    -                    if (!transferQueue.isFull() && !throttleOn && 
!reachedMaxSpoutPending) {
    -                        for (ISpout spout : spouts) {
    -                            spout.nextTuple();
    +                    boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
    +
    +                    long emptyStretch = 0;
    +                    if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
    +                        for (int j = 0; j < spouts.size(); j++) { // in 
critical path. don't use iterators.
    +                            spouts.get(j).nextTuple();
    +                        }
    +                        noEmits = (currCount == emittedCount.get());
    +                        if (noEmits) {
    +                            emptyEmitStreak.increment();
    +                        } else {
    +                            emptyStretch = emptyEmitStreak.get();
    +                            emptyEmitStreak.set(0);
                             }
                         }
    +                    if (reachedMaxSpoutPending) {
    +                        if(rmspCount==0)
    +                            LOG.debug("Reached max spout pending");
    +                        rmspCount++;
    +                    } else {
    +                        if (rmspCount>0)
    +                            LOG.debug("Ended max spout pending stretch of 
{} iterations", rmspCount);
    +                        rmspCount = 0;
    +                    }
    +
    +                    if ( receiveCount>1 ) {
    --- End diff --
    
    Is it intentional to compare this with 1? I guess it looks natural to 
compare with 0, but you may have some reasons to do it.


---

Reply via email to