Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158939356
--- Diff:
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -127,74 +141,130 @@ public void expire(Long key, TupleInfo tupleInfo) {
spoutObject.open(topoConf, taskData.getUserContext(),
outputCollector);
}
openOrPrepareWasCalled.set(true);
- LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+ LOG.info("Opened spout {}:{}", componentId, taskIds);
setupMetrics();
}
@Override
- public Callable<Object> call() throws Exception {
- init(idToTask);
-
- return new Callable<Object>() {
+ public Callable<Long> call() throws Exception {
+ init(idToTask, idToTaskBase);
+ return new Callable<Long>() {
+ int i=0;
+ final int recvqCheckSkipCount = getSpoutRecvqCheckSkipCount();
+ int bpIdleCount = 0;
+ int rmspCount = 0;
@Override
- public Object call() throws Exception {
- receiveQueue.consumeBatch(SpoutExecutor.this);
-
- final long currCount = emittedCount.get();
- final boolean throttleOn = backPressureEnabled &&
SpoutExecutor.this.throttleOn.get();
- final boolean reachedMaxSpoutPending = (maxSpoutPending !=
0) && (pending.size() >= maxSpoutPending);
- final boolean isActive = stormActive.get();
+ public Long call() throws Exception {
+ int receiveCount = 0;
+ if (i++ == recvqCheckSkipCount) {
+ receiveCount =
receiveQueue.consume(SpoutExecutor.this);
+ i=0;
+ }
+ long currCount = emittedCount.get();
+ boolean reachedMaxSpoutPending = (maxSpoutPending != 0) &&
(pending.size() >= maxSpoutPending);
+ boolean isActive = stormActive.get();
+ boolean noEmits = true;
if (isActive) {
if (!lastActive.get()) {
lastActive.set(true);
- LOG.info("Activating spout {}:{}", componentId,
idToTask.keySet());
+ LOG.info("Activating spout {}:{}", componentId,
taskIds);
for (ISpout spout : spouts) {
spout.activate();
}
}
- if (!transferQueue.isFull() && !throttleOn &&
!reachedMaxSpoutPending) {
- for (ISpout spout : spouts) {
- spout.nextTuple();
+ boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
--- End diff --
yes i tried once and rolled it back.. due to the two levels of callables.
will try again.
---