[ 
https://issues.apache.org/jira/browse/STORM-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475804#comment-16475804
 ] 

Stig Rohde Døssing commented on STORM-3073:
-------------------------------------------

For the spout executors I don't think this is a big issue, because as far as I 
can tell it only happens if the spout emits from non-nextTuple method, if the 
spout is blocked for so long that it receives 1024 metrics ticks while being 
unable to flush, or if it emits many tuples in one nextTuple call (which is a 
bad idea anyway because it prevents maxSpoutPending from working properly). 
"Fixing" this by allowing pendingEmits to grow unbounded would likely just lead 
to unbounded growth and OOME for cases like spouts emitting from fail().

I think it might be an issue for bolts though, since this puts a cap on how 
many tuples a bolt can safely emit per input tuple, without potentially 
crashing the worker. For example, I modified ExclamationTopology so the bolt 
emits 2048 copies of the input tuple, and this crashes the worker if the bolt 
executor happens to need to put the tuples in pendingEmits.

We should be able to fix this by uncapping the pendingEmits size (maybe just 
for bolts, not sure there's a good reason to do it for spouts).

> In some cases workers may crash because pendingEmits is full
> ------------------------------------------------------------
>
>                 Key: STORM-3073
>                 URL: https://issues.apache.org/jira/browse/STORM-3073
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Stig Rohde Døssing
>            Priority: Major
>
> Saw this while running the 
> https://github.com/apache/storm/blob/master/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
>  topology.
> {code}
> 2018-05-15 11:35:28.365 o.a.s.u.Utils Thread-16-spout-executor[8, 8] [ERROR] 
> Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: Queue full
>       at org.apache.storm.executor.Executor.accept(Executor.java:282) 
> ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:133) 
> ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at org.apache.storm.utils.JCQueue.consume(JCQueue.java:110) 
> ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at org.apache.storm.utils.JCQueue.consume(JCQueue.java:101) 
> ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.executor.spout.SpoutExecutor$2.call(SpoutExecutor.java:168) 
> ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.executor.spout.SpoutExecutor$2.call(SpoutExecutor.java:157) 
> ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at org.apache.storm.utils.Utils$2.run(Utils.java:349) 
> [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
> Caused by: java.lang.IllegalStateException: Queue full
>       at java.util.AbstractQueue.add(AbstractQueue.java:98) ~[?:1.8.0_144]
>       at 
> org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:113)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:516)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.executor.spout.SpoutOutputCollectorImpl.sendSpoutMsg(SpoutOutputCollectorImpl.java:140)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.executor.spout.SpoutOutputCollectorImpl.emit(SpoutOutputCollectorImpl.java:70)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:42)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at org.apache.storm.loadgen.LoadSpout.fail(LoadSpout.java:135) 
> ~[stormjar.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.executor.spout.SpoutExecutor.failSpoutMsg(SpoutExecutor.java:360)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.executor.spout.SpoutExecutor$1.expire(SpoutExecutor.java:120)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.executor.spout.SpoutExecutor$1.expire(SpoutExecutor.java:113)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at org.apache.storm.utils.RotatingMap.rotate(RotatingMap.java:63) 
> ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at 
> org.apache.storm.executor.spout.SpoutExecutor.tupleActionFn(SpoutExecutor.java:295)
>  ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       at org.apache.storm.executor.Executor.accept(Executor.java:278) 
> ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
>       ... 7 more
> {code}
> The executor's pendingEmits queue is full, and the executor then tries to add 
> another tuple. It looks to me like we're preventing the queue from filling by 
> emptying it between calls to nextTuple at 
> https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L184.
> The TVL topology reemits failed tuples directly from the fail method, which 
> can be triggered by tick tuples. If the pendingEmits queue is already close 
> to full when this happens, we might hit the error above. I think it can also 
> happen if nextTuple emits too many tuples in a call, or if too many metrics 
> ticks happen between pendingEmit flushes, since metrics ticks also trigger 
> emits.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to