[ https://issues.apache.org/jira/browse/STORM-2194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712965#comment-15712965 ]
Sathya F commented on STORM-2194: --------------------------------- Thanks Craig ! We have two problems with 1.0.2 & your patch fixes them both. 1. when storm workers start & they are not able to bind to 56700 (the rmi port), they hang around and do not die. This is easy to reproduce, I started a nc -l 56700 & started the topology. With your patch, it dies & the supervisor restarts them back again. 2016-12-01 04:24:41.721 STDERR [INFO] Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 56700; nested exception is: 2016-12-01 04:24:41.722 STDERR [INFO] java.net.BindException: Address already in use 2. the storm workers hit a SocketTimeoutException & they don't die. The log: 2016-11-30 15:21:29.004 o.a.s.util [ERROR] Async loop died! java.lang.RuntimeException: org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.daemon.executor$fn__8058$fn__8071$fn__8124.invoke(executor.clj:850) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91] Caused by: org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:199) ~[stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:141) ~[stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:114) ~[stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:68) ~[stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:42) ~[stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:265) ~[stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:257) ~[stormjar.jar:1.0.0] at org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.daemon.executor$fn__8058$tuple_action_fn__8060.invoke(executor.clj:731) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2] ... 6 more 2016-11-30 15:21:29.028 o.a.s.d.executor [INFO] Got interrupted excpetion shutting thread down... 2016-11-30 16:20:59.013 s.d.C.DBPoolDataSource-pool-ds [INFO] DBPoolDataSource-pool-ds: Destroyed connection 2016-11-30 16:21:14.014 s.d.C.DBPoolDataSource-pool-ds [INFO] DBPoolDataSource-pool-ds: Destroyed connection > ReportErrorAndDie doesn't always die > ------------------------------------ > > Key: STORM-2194 > URL: https://issues.apache.org/jira/browse/STORM-2194 > Project: Apache Storm > Issue Type: Bug > Components: storm-core > Affects Versions: 2.0.0, 1.0.2 > Reporter: Craig Hawco > Attachments: scrubbed-thread-dump.txt > > Time Spent: 40m > Remaining Estimate: 0h > > I've been trying to track down a cause of some of our issues with some > exceptions leaving Storm workers in a zombified state for some time. I > believe I've isolated the bug to the behaviour in > :report-error-and-die/reportErrorAndDie in the executor. Essentially: > {code} > :report-error-and-die (fn [error] > (try > ((:report-error <>) error) > (catch Exception e > (log-message "Error while reporting error to > cluster, proceeding with shutdown"))) > (if (or > (exception-cause? InterruptedException > error) > (exception-cause? > java.io.InterruptedIOException error)) > (log-message "Got interrupted excpetion > shutting thread down...") > ((:suicide-fn <>)))) > {code} > has the grouping for the if statement slightly wrong. It shouldn't log OR die > from InterruptedException/InterruptedIOException, but it should log under > that condition, and ALWAYS die. > Basically: > {code} > :report-error-and-die (fn [error] > (try > ((:report-error <>) error) > (catch Exception e > (log-message "Error while reporting error to > cluster, proceeding with shutdown"))) > (if (or > (exception-cause? InterruptedException > error) > (exception-cause? > java.io.InterruptedIOException error)) > (log-message "Got interrupted excpetion > shutting thread down...")) > ((:suicide-fn <>))) > {code} > After digging into the Java port of this code, it looks like a different bug > was introduced while porting: > {code} > if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e) > || > Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)) { > LOG.info("Got interrupted exception shutting thread down..."); > suicideFn.run(); > } > {code} > Was how this was initially ported, and STORM-2142 changed this to: > {code} > if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e) > || > Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)) { > LOG.info("Got interrupted exception shutting thread down..."); > } else { > suicideFn.run(); > } > {code} > However, I believe the correct port is as described above: > {code} > if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e) > || > Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)) { > LOG.info("Got interrupted exception shutting thread down..."); > } > suicideFn.run(); > {code} > I'll look into providing patches for the 1.x and 2.x branches shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)