Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158215973 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -127,74 +141,130 @@ public void expire(Long key, TupleInfo tupleInfo) { spoutObject.open(topoConf, taskData.getUserContext(), outputCollector); } openOrPrepareWasCalled.set(true); - LOG.info("Opened spout {}:{}", componentId, idToTask.keySet()); + LOG.info("Opened spout {}:{}", componentId, taskIds); setupMetrics(); } @Override - public Callable<Object> call() throws Exception { - init(idToTask); - - return new Callable<Object>() { + public Callable<Long> call() throws Exception { + init(idToTask, idToTaskBase); + return new Callable<Long>() { + int i=0; + final int recvqCheckSkipCount = getSpoutRecvqCheckSkipCount(); + int bpIdleCount = 0; + int rmspCount = 0; @Override - public Object call() throws Exception { - receiveQueue.consumeBatch(SpoutExecutor.this); - - final long currCount = emittedCount.get(); - final boolean throttleOn = backPressureEnabled && SpoutExecutor.this.throttleOn.get(); - final boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending); - final boolean isActive = stormActive.get(); + public Long call() throws Exception { + int receiveCount = 0; + if (i++ == recvqCheckSkipCount) { + receiveCount = receiveQueue.consume(SpoutExecutor.this); + i=0; + } + long currCount = emittedCount.get(); + boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending); + boolean isActive = stormActive.get(); + boolean noEmits = true; if (isActive) { if (!lastActive.get()) { lastActive.set(true); - LOG.info("Activating spout {}:{}", componentId, idToTask.keySet()); + LOG.info("Activating spout {}:{}", componentId, taskIds); for (ISpout spout : spouts) { spout.activate(); } } - if (!transferQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) { - for (ISpout spout : spouts) { - spout.nextTuple(); + boolean pendingEmitsIsEmpty = tryFlushPendingEmits(); + + long emptyStretch = 0; + if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) { + for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators. + spouts.get(j).nextTuple(); + } + noEmits = (currCount == emittedCount.get()); + if (noEmits) { + emptyEmitStreak.increment(); + } else { + emptyStretch = emptyEmitStreak.get(); + emptyEmitStreak.set(0); } } + if (reachedMaxSpoutPending) { + if(rmspCount==0) + LOG.debug("Reached max spout pending"); + rmspCount++; + } else { + if (rmspCount>0) + LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount); + rmspCount = 0; + } + + if ( receiveCount>1 ) { --- End diff -- Is it intentional to compare this with 1? I guess it looks natural to compare with 0, but you may have some reasons to do it.
---