Hi Yan,

that worked thanks! 

Davide

> On 6 Jun 2015, at 12:42 am, Yan Fang <yanfang...@gmail.com> wrote:
> 
> Hi Davide,
> 
> Consumer takes the memory is because it maintains a cache for the messages, 
> the size by default is 50000. In your case, it maybe as big as 50000*12kb. 
> See systems.system-name.
> samza.fetch.threshold in 
> http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html 
> <http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html>
> 
> "if I slow down the task (for example by skipping CPU-intensive computation) "
> 
> When you skip some computation, shouldn't the task be faster ? If that's the 
> case, I suspect the task processes so fast that the consumer never reaches 
> the 50000 threshold. But I haven't verified it.
> 
> Thanks,
> 
> Yan Fang
> 
>> On Jun 5, 2015, at 3:50 AM, Davide Simoncelli <netcelli....@gmail.com 
>> <mailto:netcelli....@gmail.com>> wrote:
>> 
>> Hello,
>> 
>> I have memory issues when running a task on the cluster. The Samza 
>> application performs pattern matching on received messages and sends results 
>> to another Kafka topic.
>> I tried to put some pressure on the system by sending messages with an 
>> average size of 12KiB. After a while containers crash because of no heap 
>> memory left or java.lang.OutOfMemoryError: GC overhead limit exceeded.
>> For example:
>> 
>> java.lang.OutOfMemoryError: Java heap space
>>   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[na:1.8.0_45]
>>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_45]
>>   at 
>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56) 
>> ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) 
>> ~[kafka_2.10-0.8.2.0.jar:na]
>>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) 
>> ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
>> ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>>  ~[kafka_2.10-0.8.2.0.jar:na]
>>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
>> ~[kafka_2.10-0.8.2.0.jar:na]
>>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) 
>> ~[kafka_2.10-0.8.2.0.jar:na]
>>   at 
>> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
>>  ~[samza-kafka_2.10-0.9.0.jar:na]
>>   at 
>> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
>>  ~[samza-kafka_2.10-0.9.0.jar:na]
>>   at 
>> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176)
>>  ~[samza-kafka_2.10-0.9.0.jar:na]
>>   at 
>> org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146)
>>  ~[samza-kafka_2.10-0.9.0.jar:na]
>>   at 
>> org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133)
>>  ~[samza-kafka_2.10-0.9.0.jar:na]
>>   at 
>> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>>  ~[samza-core_2.10-0.9.0.jar:0.9.0]
>>   at 
>> org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132) 
>> ~[samza-kafka_2.10-0.9.0.jar:na]
>>   at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_45]
>> 
>> java.lang.OutOfMemoryError: Java heap space
>>   at kafka.utils.Utils$.readBytes(Utils.scala:127) 
>> ~[kafka_2.10-0.8.2.0.jar:na]
>>   at kafka.utils.Utils$.readBytes(Utils.scala:121) 
>> ~[kafka_2.10-0.8.2.0.jar:na]
>> 
>> 
>> So I instructed each container to generate a heap dump on crash. By 
>> inspecting them I see the system consumer used a lot of memory:
>> 
>> http://imgur.com/W7i0dn6 <http://imgur.com/W7i0dn6> 
>> <http://imgur.com/W7i0dn6 <http://imgur.com/W7i0dn6>>
>> http://imgur.com/2CTphAk <http://imgur.com/2CTphAk> 
>> <http://imgur.com/2CTphAk <http://imgur.com/2CTphAk>>
>> 
>> Why does the system consumer use so much memory? I noticed if I slow down 
>> the task (for example by skipping CPU-intensive computation) consumers don’t 
>> crash. Is that because there is much more time so that the garbage collector 
>> can reclaim unused memory?
>> 
>> Regards
>> 
>> Davide

Reply via email to