[ 
https://issues.apache.org/jira/browse/STORM-2440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Meyer updated STORM-2440:
------------------------------
    Description: 
During two somewhat extended outages of our Kafka cluster, we experienced a 
problem with our Storm topologies consuming data from that Kafka cluster.

Almost all our topologies just silently stopped processing data from some of 
the topics/partitions, an the only way to fix this situation was to restart 
those topologies.

I tracked down one occurrence of the failure to this worker, which was running 
one the KafkaSpouts:

{noformat}
2017-03-18 04:06:15.389 o.a.s.k.KafkaUtils [ERROR] Error fetching data from 
[Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic 
[tagging_log]: [NOT_LEADER_FOR_PARTITION]
2017-03-18 04:06:15.389 o.a.s.k.KafkaSpout [WARN] Fetch failed
org.apache.storm.kafka.FailedFetchException: Error fetching data from 
[Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic 
[tagging_log]: [NOT_LEADER_FOR_PARTITION]
        at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:213) 
~[stormjar.jar:?]
        at 
org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) 
~[stormjar.jar:?]
        at 
org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) 
[stormjar.jar:?]
        at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
 [storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2017-03-18 04:06:15.390 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Refreshing 
partition manager connections
2017-03-18 04:06:15.395 o.a.s.k.DynamicBrokersReader [INFO] Read partition info 
from zookeeper: GlobalPartitionInformation{topic=tagging_log, 
partitionMap={0=kafka-03:9092, 1=kafka-12:9092,
 2=kafka-08:9092, 3=kafka-05:9092}}
2017-03-18 04:06:15.395 o.a.s.k.KafkaUtils [INFO] Task [1/1] assigned 
[Partition{host=kafka-03:9092, topic=tagging_log, partition=0}, 
Partition{host=kafka-12:9092, topic=tagging_log, partit
ion=1}, Partition{host=kafka-08:9092, topic=tagging_log, partition=2}, 
Partition{host=kafka-05:9092, topic=tagging_log, partition=3}]
2017-03-18 04:06:15.395 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted 
partition managers: [Partition{host=kafka-08:9092, topic=tagging_log, 
partition=1}]
2017-03-18 04:06:15.396 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition 
managers: [Partition{host=kafka-12:9092, topic=tagging_log, partition=1}]
2017-03-18 04:06:15.398 o.a.s.k.PartitionManager [INFO] Read partition 
information from: /log_processing/tagging/kafka-tagging-spout/partition_1  --> 
{"partition":1,"off
set":40567174332,"topology":{"name":"tagging-aerospike-1","id":"tagging-aerospike-1-3-1489587827"},"topic":"tagging_log","broker":{"port":9092,"host":"kafka-08"}}
2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error:
java.net.SocketTimeoutException
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
~[?:1.8.0_121]
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
~[?:1.8.0_121]
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
~[?:1.8.0_121]
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
 ~[stormjar.jar:?]
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
~[stormjar.jar:?]
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
~[stormjar.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) 
[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
 [stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) 
[stormjar.jar:?]
        at 
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) 
[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) 
[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) 
[stormjar.jar:?]
        at 
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) 
[stormjar.jar:?]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) 
[stormjar.jar:?]
        at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
 [storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.net.SocketTimeoutException
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) 
~[stormjar.jar:?]
        at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
 ~[storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketTimeoutException
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
~[?:1.8.0_121]
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
~[?:1.8.0_121]
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
~[?:1.8.0_121]
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
 ~[stormjar.jar:?]
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
~[stormjar.jar:?]
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
~[stormjar.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) 
~[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
 ~[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) 
~[stormjar.jar:?]
        at 
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) 
~[stormjar.jar:?]
        at 
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
~[stormjar.jar:?]
        ... 5 more
2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] 
java.lang.RuntimeException: java.net.SocketTimeoutException
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) 
~[stormjar.jar:?]
        at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
 ~[storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketTimeoutException
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
~[?:1.8.0_121]
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
~[?:1.8.0_121]
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
~[?:1.8.0_121]
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
 ~[stormjar.jar:?]
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
~[stormjar.jar:?]
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
~[stormjar.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) 
~[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
 ~[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) 
~[stormjar.jar:?]
        at 
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) 
~[stormjar.jar:?]
        at 
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
~[stormjar.jar:?]
        ... 5 more
2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion 
shutting thread down...
{noformat}

There were no more outputs in the log after that until the toplogy was manually 
killed.

As you can see the {{java.net.SocketTimeoutException}} escapes the storm-kafka 
code (probably a problem in and of itself), but the worker is not killed. The 
thread that calls the {{.nextTuple}} method of the spout is exited on the other 
hand.
This is the culprit line: 
https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270

I see that this has been fixed in the Java port of the executor code by 
explicitly excluding {{java.net.SocketTimeoutException}} from the condition.
I will open a pull request with a backport tomorrow.

  was:
During two somewhat extended outages of our Kafka cluster, we experienced a 
problem with our Storm topologies consuming data from that Kafka cluster.

Almost all our topologies just silently stopped processing data from some of 
the topics/partitions, an the only way to fix this situation was to restart 
those topologies.

I tracked down one occurrence of the failure to this worker, which was running 
one the KafkaSpouts:

{noformat}
2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error:
java.net.SocketTimeoutException
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
~[?:1.8.0_121]
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
~[?:1.8.0_121]
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
~[?:1.8.0_121]
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
 ~[stormjar.jar:?]
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
~[stormjar.jar:?]
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
~[stormjar.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) 
[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
 [stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) 
[stormjar.jar:?]
        at 
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) 
[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) 
[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) 
[stormjar.jar:?]
        at 
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) 
[stormjar.jar:?]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) 
[stormjar.jar:?]
        at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
 [storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.net.SocketTimeoutException
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) 
~[stormjar.jar:?]
        at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
 ~[storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketTimeoutException
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
~[?:1.8.0_121]
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
~[?:1.8.0_121]
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
~[?:1.8.0_121]
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
 ~[stormjar.jar:?]
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
~[stormjar.jar:?]
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
~[stormjar.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) 
~[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
 ~[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) 
~[stormjar.jar:?]
        at 
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) 
~[stormjar.jar:?]
        at 
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
~[stormjar.jar:?]
        ... 5 more
2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] 
java.lang.RuntimeException: java.net.SocketTimeoutException
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) 
~[stormjar.jar:?]
        at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
 ~[storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketTimeoutException
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
~[?:1.8.0_121]
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
~[?:1.8.0_121]
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
~[?:1.8.0_121]
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
 ~[stormjar.jar:?]
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
~[stormjar.jar:?]
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
~[stormjar.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) 
~[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
 ~[stormjar.jar:?]
        at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) 
~[stormjar.jar:?]
        at 
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) 
~[stormjar.jar:?]
        at 
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) 
~[stormjar.jar:?]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
~[stormjar.jar:?]
        ... 5 more
2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion 
shutting thread down...
{noformat}

There were no more outputs in the log after that until the toplogy was manually 
killed.

As you can see the {{java.net.SocketTimeoutException}} escapes the storm-kafka 
code (probably a problem in and of itself), but the worker is not killed. The 
thread that calls the {{.nextTuple}} method of the spout is exited on the other 
hand.
This is the culprit line: 
https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270

I see that this has been fixed in the Java port of the executor code by 
explicitly excluding {{java.net.SocketTimeoutException}} from the condition.
I will open a pull request with a backport tomorrow.


> Kafka outage can lead to lockup of topology
> -------------------------------------------
>
>                 Key: STORM-2440
>                 URL: https://issues.apache.org/jira/browse/STORM-2440
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core, storm-kafka
>    Affects Versions: 0.10.1, 1.0.1, 1.0.2, 1.1.0
>            Reporter: Nico Meyer
>
> During two somewhat extended outages of our Kafka cluster, we experienced a 
> problem with our Storm topologies consuming data from that Kafka cluster.
> Almost all our topologies just silently stopped processing data from some of 
> the topics/partitions, an the only way to fix this situation was to restart 
> those topologies.
> I tracked down one occurrence of the failure to this worker, which was 
> running one the KafkaSpouts:
> {noformat}
> 2017-03-18 04:06:15.389 o.a.s.k.KafkaUtils [ERROR] Error fetching data from 
> [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic 
> [tagging_log]: [NOT_LEADER_FOR_PARTITION]
> 2017-03-18 04:06:15.389 o.a.s.k.KafkaSpout [WARN] Fetch failed
> org.apache.storm.kafka.FailedFetchException: Error fetching data from 
> [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic 
> [tagging_log]: [NOT_LEADER_FOR_PARTITION]
>         at 
> org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:213) 
> ~[stormjar.jar:?]
>         at 
> org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) 
> ~[stormjar.jar:?]
>         at 
> org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) 
> ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) 
> [stormjar.jar:?]
>         at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
>  [storm-core-1.0.2.jar:1.0.2]
>         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
> [storm-core-1.0.2.jar:1.0.2]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> 2017-03-18 04:06:15.390 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Refreshing 
> partition manager connections
> 2017-03-18 04:06:15.395 o.a.s.k.DynamicBrokersReader [INFO] Read partition 
> info from zookeeper: GlobalPartitionInformation{topic=tagging_log, 
> partitionMap={0=kafka-03:9092, 1=kafka-12:9092,
>  2=kafka-08:9092, 3=kafka-05:9092}}
> 2017-03-18 04:06:15.395 o.a.s.k.KafkaUtils [INFO] Task [1/1] assigned 
> [Partition{host=kafka-03:9092, topic=tagging_log, partition=0}, 
> Partition{host=kafka-12:9092, topic=tagging_log, partit
> ion=1}, Partition{host=kafka-08:9092, topic=tagging_log, partition=2}, 
> Partition{host=kafka-05:9092, topic=tagging_log, partition=3}]
> 2017-03-18 04:06:15.395 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted 
> partition managers: [Partition{host=kafka-08:9092, topic=tagging_log, 
> partition=1}]
> 2017-03-18 04:06:15.396 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition 
> managers: [Partition{host=kafka-12:9092, topic=tagging_log, partition=1}]
> 2017-03-18 04:06:15.398 o.a.s.k.PartitionManager [INFO] Read partition 
> information from: /log_processing/tagging/kafka-tagging-spout/partition_1  
> --> {"partition":1,"off
> set":40567174332,"topology":{"name":"tagging-aerospike-1","id":"tagging-aerospike-1-3-1489587827"},"topic":"tagging_log","broker":{"port":9092,"host":"kafka-08"}}
> 2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error:
> java.net.SocketTimeoutException
>         at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
> ~[?:1.8.0_121]
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
> ~[?:1.8.0_121]
>         at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
> ~[?:1.8.0_121]
>         at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
>  ~[stormjar.jar:?]
>         at 
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
> ~[stormjar.jar:?]
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
> ~[stormjar.jar:?]
>         at 
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) 
> [stormjar.jar:?]
>         at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>  [stormjar.jar:?]
>         at 
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) 
> [stormjar.jar:?]
>         at 
> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
>  [stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) 
> [stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) 
> [stormjar.jar:?]
>         at 
> org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) 
> [stormjar.jar:?]
>         at 
> org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
> [stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) 
> [stormjar.jar:?]
>         at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
>  [storm-core-1.0.2.jar:1.0.2]
>         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
> [storm-core-1.0.2.jar:1.0.2]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> 2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.net.SocketTimeoutException
>         at 
> org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) 
> ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) 
> ~[stormjar.jar:?]
>         at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
>  ~[storm-core-1.0.2.jar:1.0.2]
>         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
> [storm-core-1.0.2.jar:1.0.2]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.net.SocketTimeoutException
>         at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
> ~[?:1.8.0_121]
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
> ~[?:1.8.0_121]
>         at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
> ~[?:1.8.0_121]
>         at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
>  ~[stormjar.jar:?]
>         at 
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
> ~[stormjar.jar:?]
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
> ~[stormjar.jar:?]
>         at 
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) 
> ~[stormjar.jar:?]
>         at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>  ~[stormjar.jar:?]
>         at 
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) 
> ~[stormjar.jar:?]
>         at 
> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
>  ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) 
> ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) 
> ~[stormjar.jar:?]
>         at 
> org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) 
> ~[stormjar.jar:?]
>         at 
> org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
> ~[stormjar.jar:?]
>         ... 5 more
> 2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] 
> java.lang.RuntimeException: java.net.SocketTimeoutException
>         at 
> org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) 
> ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) 
> ~[stormjar.jar:?]
>         at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
>  ~[storm-core-1.0.2.jar:1.0.2]
>         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
> [storm-core-1.0.2.jar:1.0.2]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.net.SocketTimeoutException
>         at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
> ~[?:1.8.0_121]
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
> ~[?:1.8.0_121]
>         at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
> ~[?:1.8.0_121]
>         at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81)
>  ~[stormjar.jar:?]
>         at 
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
> ~[stormjar.jar:?]
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
> ~[stormjar.jar:?]
>         at 
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) 
> ~[stormjar.jar:?]
>         at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>  ~[stormjar.jar:?]
>         at 
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) 
> ~[stormjar.jar:?]
>         at 
> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
>  ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) 
> ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) 
> ~[stormjar.jar:?]
>         at 
> org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) 
> ~[stormjar.jar:?]
>         at 
> org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
> ~[stormjar.jar:?]
>         ... 5 more
> 2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion 
> shutting thread down...
> {noformat}
> There were no more outputs in the log after that until the toplogy was 
> manually killed.
> As you can see the {{java.net.SocketTimeoutException}} escapes the 
> storm-kafka code (probably a problem in and of itself), but the worker is not 
> killed. The thread that calls the {{.nextTuple}} method of the spout is 
> exited on the other hand.
> This is the culprit line: 
> https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270
> I see that this has been fixed in the Java port of the executor code by 
> explicitly excluding {{java.net.SocketTimeoutException}} from the condition.
> I will open a pull request with a backport tomorrow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to