[ 
https://issues.apache.org/jira/browse/SAMZA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13877404#comment-13877404
 ] 

Rekha Joshi commented on SAMZA-101:
-----------------------------------

I did have a brief look.However couple of points which are in way of quick 
closure. Below - 

1.Not easily reproduce.
2.In a distributed environment, these errors are often reported.And often 
conf/restart is resolution.
3.There might be a reasoning to resolve "too many open files" by correcting 
ulimit to a greater number, say 32768 (than typical default 1024).
However I checked code says maximum, still can possibly explicitly check at 
/proc/sys/fs/file-max, sysctl conf, nproc etc when error can be reproduced.
4.AFAIK, given the ulimit is sufficient enough, given that kafka can keep 
coming down, how can we have automatic way of recovery for samza task without 
going into loop network exception stack trace.
5.The solution reasoning would be to verify simpleconsumer closes correctly, 
perhaps have a timeout monitor at BrokerProxy,and retry post the timeout period 
after these exceptions.
Also maybe introduce a filethresholdmonitor and explicitly kill and restart job 
once a threshold is reached.
--simpleconsumer: I checked at BrokerProxy and line122 does it., and not sure 
if I am missing any open simple consumer with dry run.
6.Will that timeout and retry, threshold checks and initiate task again be good 
enough? Or 
Would recovering from these errors mean recovering states the samza task was in 
when the kafka came down?


> Samza task leaking file descriptors on Kafka exceptions
> -------------------------------------------------------
>
>                 Key: SAMZA-101
>                 URL: https://issues.apache.org/jira/browse/SAMZA-101
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Alan Li
>            Assignee: Rekha Joshi
>
> Initially, my samza task began seeing many UnresolvedAddressExceptions, 
> likely because the kafka cluster went down and the samza task is retrying:
> {noformat}
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating 
> new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating 
> simple consumer and retrying connection
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace 
> for fetchMessages exception.
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:30)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:480)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at 
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
> Eventually, I began seeing these, which the samza task will never recover 
> from:
> {noformat}
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating 
> simple consumer and retrying connection
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating 
> new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace 
> for fetchMessages exception.
> java.net.SocketException: Too many open files
> at sun.nio.ch.Net.socket0(Native Method)
> at sun.nio.ch.Net.socket(Net.java:97)
> at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:84)
> at 
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:37)
> at java.nio.channels.SocketChannel.open(SocketChannel.java:105)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at 
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to