[
https://issues.apache.org/jira/browse/SAMZA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13868188#comment-13868188
]
Jakob Homan commented on SAMZA-101:
-----------------------------------
I've been keeping an eye out for this in my jobs, but have not seen it. It's
likely we'll have to manually kill some brokers to reproduce and fix it from
there.
> Samza task leaking file descriptors on Kafka exceptions
> -------------------------------------------------------
>
> Key: SAMZA-101
> URL: https://issues.apache.org/jira/browse/SAMZA-101
> Project: Samza
> Issue Type: Bug
> Reporter: Alan Li
> Assignee: Rekha Joshi
>
> Initially, my samza task began seeing many UnresolvedAddressExceptions,
> likely because the kafka cluster went down and the samza task is retrying:
> {noformat}
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating
> new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating
> simple consumer and retrying connection
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace
> for fetchMessages exception.
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:30)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:480)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at
> org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
> Eventually, I began seeing these, which the samza task will never recover
> from:
> {noformat}
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating
> simple consumer and retrying connection
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating
> new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace
> for fetchMessages exception.
> java.net.SocketException: Too many open files
> at sun.nio.ch.Net.socket0(Native Method)
> at sun.nio.ch.Net.socket(Net.java:97)
> at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:84)
> at
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:37)
> at java.nio.channels.SocketChannel.open(SocketChannel.java:105)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at
> org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)