[ https://issues.apache.org/jira/browse/STORM-2440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nico Meyer updated STORM-2440: ------------------------------ Description: During two somewhat extended outages of our Kafka cluster, we experienced a problem with our Storm topologies consuming data from that Kafka cluster. Almost all our topologies just silently stopped processing data from some of the topics/partitions, an the only way to fix this situation was to restart those topologies. I tracked down one occurrence of the failure to this worker, which was running one the KafkaSpouts: {noformat} 2017-03-18 04:06:15.389 o.a.s.k.KafkaUtils [ERROR] Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION] 2017-03-18 04:06:15.389 o.a.s.k.KafkaSpout [WARN] Fetch failed org.apache.storm.kafka.FailedFetchException: Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:213) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] 2017-03-18 04:06:15.390 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Refreshing partition manager connections 2017-03-18 04:06:15.395 o.a.s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=tagging_log, partitionMap={0=kafka-03:9092, 1=kafka-12:9092, 2=kafka-08:9092, 3=kafka-05:9092}} 2017-03-18 04:06:15.395 o.a.s.k.KafkaUtils [INFO] Task [1/1] assigned [Partition{host=kafka-03:9092, topic=tagging_log, partition=0}, Partition{host=kafka-12:9092, topic=tagging_log, partit ion=1}, Partition{host=kafka-08:9092, topic=tagging_log, partition=2}, Partition{host=kafka-05:9092, topic=tagging_log, partition=3}] 2017-03-18 04:06:15.395 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] 2017-03-18 04:06:15.396 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=kafka-12:9092, topic=tagging_log, partition=1}] 2017-03-18 04:06:15.398 o.a.s.k.PartitionManager [INFO] Read partition information from: /log_processing/tagging/kafka-tagging-spout/partition_1 --> {"partition":1,"off set":40567174332,"topology":{"name":"tagging-aerospike-1","id":"tagging-aerospike-1-3-1489587827"},"topic":"tagging_log","broker":{"port":9092,"host":"kafka-08"}} 2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) [stormjar.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [stormjar.jar:?] at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [stormjar.jar:?] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) [stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) [stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) [stormjar.jar:?] at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) [stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] 2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died! java.lang.RuntimeException: java.net.SocketTimeoutException at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?] at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?] ... 5 more 2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] java.lang.RuntimeException: java.net.SocketTimeoutException at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?] at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?] ... 5 more 2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion shutting thread down... {noformat} There were no more outputs in the log after that until the toplogy was manually killed. As you can see the {{java.net.SocketTimeoutException}} escapes the storm-kafka code (probably a problem in and of itself), but the worker is not killed. The thread that calls the {{.nextTuple}} method of the spout is exited on the other hand. This is the culprit line: https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270 I see that this has been fixed in the Java port of the executor code by explicitly excluding {{java.net.SocketTimeoutException}} from the condition. I will open a pull request with a backport tomorrow. was: During two somewhat extended outages of our Kafka cluster, we experienced a problem with our Storm topologies consuming data from that Kafka cluster. Almost all our topologies just silently stopped processing data from some of the topics/partitions, an the only way to fix this situation was to restart those topologies. I tracked down one occurrence of the failure to this worker, which was running one the KafkaSpouts: {noformat} 2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) [stormjar.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [stormjar.jar:?] at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [stormjar.jar:?] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) [stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) [stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) [stormjar.jar:?] at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) [stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] 2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died! java.lang.RuntimeException: java.net.SocketTimeoutException at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?] at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?] ... 5 more 2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] java.lang.RuntimeException: java.net.SocketTimeoutException at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?] at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?] ... 5 more 2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion shutting thread down... {noformat} There were no more outputs in the log after that until the toplogy was manually killed. As you can see the {{java.net.SocketTimeoutException}} escapes the storm-kafka code (probably a problem in and of itself), but the worker is not killed. The thread that calls the {{.nextTuple}} method of the spout is exited on the other hand. This is the culprit line: https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270 I see that this has been fixed in the Java port of the executor code by explicitly excluding {{java.net.SocketTimeoutException}} from the condition. I will open a pull request with a backport tomorrow. > Kafka outage can lead to lockup of topology > ------------------------------------------- > > Key: STORM-2440 > URL: https://issues.apache.org/jira/browse/STORM-2440 > Project: Apache Storm > Issue Type: Bug > Components: storm-core, storm-kafka > Affects Versions: 0.10.1, 1.0.1, 1.0.2, 1.1.0 > Reporter: Nico Meyer > > During two somewhat extended outages of our Kafka cluster, we experienced a > problem with our Storm topologies consuming data from that Kafka cluster. > Almost all our topologies just silently stopped processing data from some of > the topics/partitions, an the only way to fix this situation was to restart > those topologies. > I tracked down one occurrence of the failure to this worker, which was > running one the KafkaSpouts: > {noformat} > 2017-03-18 04:06:15.389 o.a.s.k.KafkaUtils [ERROR] Error fetching data from > [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic > [tagging_log]: [NOT_LEADER_FOR_PARTITION] > 2017-03-18 04:06:15.389 o.a.s.k.KafkaSpout [WARN] Fetch failed > org.apache.storm.kafka.FailedFetchException: Error fetching data from > [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic > [tagging_log]: [NOT_LEADER_FOR_PARTITION] > at > org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:213) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) > ~[stormjar.jar:?] > at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) > [stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) > [storm-core-1.0.2.jar:1.0.2] > at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) > [storm-core-1.0.2.jar:1.0.2] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] > 2017-03-18 04:06:15.390 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Refreshing > partition manager connections > 2017-03-18 04:06:15.395 o.a.s.k.DynamicBrokersReader [INFO] Read partition > info from zookeeper: GlobalPartitionInformation{topic=tagging_log, > partitionMap={0=kafka-03:9092, 1=kafka-12:9092, > 2=kafka-08:9092, 3=kafka-05:9092}} > 2017-03-18 04:06:15.395 o.a.s.k.KafkaUtils [INFO] Task [1/1] assigned > [Partition{host=kafka-03:9092, topic=tagging_log, partition=0}, > Partition{host=kafka-12:9092, topic=tagging_log, partit > ion=1}, Partition{host=kafka-08:9092, topic=tagging_log, partition=2}, > Partition{host=kafka-05:9092, topic=tagging_log, partition=3}] > 2017-03-18 04:06:15.395 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted > partition managers: [Partition{host=kafka-08:9092, topic=tagging_log, > partition=1}] > 2017-03-18 04:06:15.396 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition > managers: [Partition{host=kafka-12:9092, topic=tagging_log, partition=1}] > 2017-03-18 04:06:15.398 o.a.s.k.PartitionManager [INFO] Read partition > information from: /log_processing/tagging/kafka-tagging-spout/partition_1 > --> {"partition":1,"off > set":40567174332,"topology":{"name":"tagging-aerospike-1","id":"tagging-aerospike-1-3-1489587827"},"topic":"tagging_log","broker":{"port":9092,"host":"kafka-08"}} > 2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error: > java.net.SocketTimeoutException > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) > ~[?:1.8.0_121] > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > ~[?:1.8.0_121] > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) > ~[stormjar.jar:?] > at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) > ~[stormjar.jar:?] > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) > ~[stormjar.jar:?] > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) > [stormjar.jar:?] > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) > [stormjar.jar:?] > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) > [stormjar.jar:?] > at > kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) > [stormjar.jar:?] > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) > [stormjar.jar:?] > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) > [stormjar.jar:?] > at > org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) > [stormjar.jar:?] > at > org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) > [stormjar.jar:?] > at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) > [stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) > [storm-core-1.0.2.jar:1.0.2] > at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) > [storm-core-1.0.2.jar:1.0.2] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] > 2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died! > java.lang.RuntimeException: java.net.SocketTimeoutException > at > org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) > ~[stormjar.jar:?] > at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) > ~[storm-core-1.0.2.jar:1.0.2] > at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) > [storm-core-1.0.2.jar:1.0.2] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] > Caused by: java.net.SocketTimeoutException > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) > ~[?:1.8.0_121] > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > ~[?:1.8.0_121] > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) > ~[stormjar.jar:?] > at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) > ~[stormjar.jar:?] > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) > ~[stormjar.jar:?] > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) > ~[stormjar.jar:?] > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) > ~[stormjar.jar:?] > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) > ~[stormjar.jar:?] > at > kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) > ~[stormjar.jar:?] > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) > ~[stormjar.jar:?] > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) > ~[stormjar.jar:?] > ... 5 more > 2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] > java.lang.RuntimeException: java.net.SocketTimeoutException > at > org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) > ~[stormjar.jar:?] > at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) > ~[storm-core-1.0.2.jar:1.0.2] > at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) > [storm-core-1.0.2.jar:1.0.2] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] > Caused by: java.net.SocketTimeoutException > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) > ~[?:1.8.0_121] > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > ~[?:1.8.0_121] > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) > ~[stormjar.jar:?] > at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) > ~[stormjar.jar:?] > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) > ~[stormjar.jar:?] > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) > ~[stormjar.jar:?] > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) > ~[stormjar.jar:?] > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) > ~[stormjar.jar:?] > at > kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) > ~[stormjar.jar:?] > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) > ~[stormjar.jar:?] > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) > ~[stormjar.jar:?] > ... 5 more > 2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion > shutting thread down... > {noformat} > There were no more outputs in the log after that until the toplogy was > manually killed. > As you can see the {{java.net.SocketTimeoutException}} escapes the > storm-kafka code (probably a problem in and of itself), but the worker is not > killed. The thread that calls the {{.nextTuple}} method of the spout is > exited on the other hand. > This is the culprit line: > https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270 > I see that this has been fixed in the Java port of the executor code by > explicitly excluding {{java.net.SocketTimeoutException}} from the condition. > I will open a pull request with a backport tomorrow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)