Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158215083 --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -127,74 +141,130 @@ public void expire(Long key, TupleInfo tupleInfo) { spoutObject.open(topoConf, taskData.getUserContext(), outputCollector); } openOrPrepareWasCalled.set(true); - LOG.info("Opened spout {}:{}", componentId, idToTask.keySet()); + LOG.info("Opened spout {}:{}", componentId, taskIds); setupMetrics(); } @Override - public Callable<Object> call() throws Exception { - init(idToTask); - - return new Callable<Object>() { + public Callable<Long> call() throws Exception { + init(idToTask, idToTaskBase); + return new Callable<Long>() { + int i=0; + final int recvqCheckSkipCount = getSpoutRecvqCheckSkipCount(); + int bpIdleCount = 0; + int rmspCount = 0; @Override - public Object call() throws Exception { - receiveQueue.consumeBatch(SpoutExecutor.this); - - final long currCount = emittedCount.get(); - final boolean throttleOn = backPressureEnabled && SpoutExecutor.this.throttleOn.get(); - final boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending); - final boolean isActive = stormActive.get(); + public Long call() throws Exception { + int receiveCount = 0; + if (i++ == recvqCheckSkipCount) { --- End diff -- minor: we may be able to have longer but better variable name other than `i`, since it's not a temporary short scope variable.
---