[ 
https://issues.apache.org/jira/browse/SAMZA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13877919#comment-13877919
 ] 

Chris Riccomini commented on SAMZA-101:
---------------------------------------

I think the best way to try and reproduce this is probably to use hello-samza 
(https://github.com/linkedin/hello-samza), and turn the Kafka broker off after 
the job has started. If you let the StreamTask run for a day or longer, and you 
see the network file handles for the process start to go up (or you see the 
out-of-file-handles-exception), then you can be sure something is wrong.

If the file handles stay stable, then I'm inclined to close this ticket as 
unable to reproduce.

If we can reproduce it, we can talk about the best fix, since we'll be able to 
tell what the problem is.

I'm inclined to think that this isn't fixable, since we call 
simpleConsumer.close immediately before the lines in the log above, but it's 
worth trying to reproduce before we give up on it.

> Samza task leaking file descriptors on Kafka exceptions
> -------------------------------------------------------
>
>                 Key: SAMZA-101
>                 URL: https://issues.apache.org/jira/browse/SAMZA-101
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Alan Li
>            Assignee: Rekha Joshi
>
> Initially, my samza task began seeing many UnresolvedAddressExceptions, 
> likely because the kafka cluster went down and the samza task is retrying:
> {noformat}
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating 
> new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating 
> simple consumer and retrying connection
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace 
> for fetchMessages exception.
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:30)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:480)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at 
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
> Eventually, I began seeing these, which the samza task will never recover 
> from:
> {noformat}
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating 
> simple consumer and retrying connection
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating 
> new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace 
> for fetchMessages exception.
> java.net.SocketException: Too many open files
> at sun.nio.ch.Net.socket0(Native Method)
> at sun.nio.ch.Net.socket(Net.java:97)
> at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:84)
> at 
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:37)
> at java.nio.channels.SocketChannel.open(SocketChannel.java:105)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at 
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to