[
https://issues.apache.org/jira/browse/SAMZA-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13877404#comment-13877404
]
Rekha Joshi commented on SAMZA-101:
-----------------------------------
I did have a brief look.However couple of points which are in way of quick
closure. Below -
1.Not easily reproduce.
2.In a distributed environment, these errors are often reported.And often
conf/restart is resolution.
3.There might be a reasoning to resolve "too many open files" by correcting
ulimit to a greater number, say 32768 (than typical default 1024).
However I checked code says maximum, still can possibly explicitly check at
/proc/sys/fs/file-max, sysctl conf, nproc etc when error can be reproduced.
4.AFAIK, given the ulimit is sufficient enough, given that kafka can keep
coming down, how can we have automatic way of recovery for samza task without
going into loop network exception stack trace.
5.The solution reasoning would be to verify simpleconsumer closes correctly,
perhaps have a timeout monitor at BrokerProxy,and retry post the timeout period
after these exceptions.
Also maybe introduce a filethresholdmonitor and explicitly kill and restart job
once a threshold is reached.
--simpleconsumer: I checked at BrokerProxy and line122 does it., and not sure
if I am missing any open simple consumer with dry run.
6.Will that timeout and retry, threshold checks and initiate task again be good
enough? Or
Would recovering from these errors mean recovering states the samza task was in
when the kafka came down?
> Samza task leaking file descriptors on Kafka exceptions
> -------------------------------------------------------
>
> Key: SAMZA-101
> URL: https://issues.apache.org/jira/browse/SAMZA-101
> Project: Samza
> Issue Type: Bug
> Reporter: Alan Li
> Assignee: Rekha Joshi
>
> Initially, my samza task began seeing many UnresolvedAddressExceptions,
> likely because the kafka cluster went down and the samza task is retrying:
> {noformat}
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating
> new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating
> simple consumer and retrying connection
> 2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace
> for fetchMessages exception.
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:30)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:480)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at
> org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
> Eventually, I began seeing these, which the samza task will never recover
> from:
> {noformat}
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating
> simple consumer and retrying connection
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating
> new SimpleConsumer for host kafka-host-12345:10251 for system kafka
> 2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace
> for fetchMessages exception.
> java.net.SocketException: Too many open files
> at sun.nio.ch.Net.socket0(Native Method)
> at sun.nio.ch.Net.socket(Net.java:97)
> at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:84)
> at
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:37)
> at java.nio.channels.SocketChannel.open(SocketChannel.java:105)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
> at
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
> at
> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
> at
> org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)