Hi Yan, that worked thanks!
Davide > On 6 Jun 2015, at 12:42 am, Yan Fang <yanfang...@gmail.com> wrote: > > Hi Davide, > > Consumer takes the memory is because it maintains a cache for the messages, > the size by default is 50000. In your case, it maybe as big as 50000*12kb. > See systems.system-name. > samza.fetch.threshold in > http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html > <http://samza.apache.org/learn/documentation/0.9/jobs/configuration-table.html> > > "if I slow down the task (for example by skipping CPU-intensive computation) " > > When you skip some computation, shouldn't the task be faster ? If that's the > case, I suspect the task processes so fast that the consumer never reaches > the 50000 threshold. But I haven't verified it. > > Thanks, > > Yan Fang > >> On Jun 5, 2015, at 3:50 AM, Davide Simoncelli <netcelli....@gmail.com >> <mailto:netcelli....@gmail.com>> wrote: >> >> Hello, >> >> I have memory issues when running a task on the cluster. The Samza >> application performs pattern matching on received messages and sends results >> to another Kafka topic. >> I tried to put some pressure on the system by sending messages with an >> average size of 12KiB. After a while containers crash because of no heap >> memory left or java.lang.OutOfMemoryError: GC overhead limit exceeded. >> For example: >> >> java.lang.OutOfMemoryError: Java heap space >> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[na:1.8.0_45] >> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_45] >> at >> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at kafka.network.Receive$class.readCompletely(Transmission.scala:56) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at >> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48) >> ~[samza-kafka_2.10-0.9.0.jar:na] >> at >> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41) >> ~[samza-kafka_2.10-0.9.0.jar:na] >> at >> org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176) >> ~[samza-kafka_2.10-0.9.0.jar:na] >> at >> org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146) >> ~[samza-kafka_2.10-0.9.0.jar:na] >> at >> org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133) >> ~[samza-kafka_2.10-0.9.0.jar:na] >> at >> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82) >> ~[samza-core_2.10-0.9.0.jar:0.9.0] >> at >> org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132) >> ~[samza-kafka_2.10-0.9.0.jar:na] >> at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_45] >> >> java.lang.OutOfMemoryError: Java heap space >> at kafka.utils.Utils$.readBytes(Utils.scala:127) >> ~[kafka_2.10-0.8.2.0.jar:na] >> at kafka.utils.Utils$.readBytes(Utils.scala:121) >> ~[kafka_2.10-0.8.2.0.jar:na] >> >> >> So I instructed each container to generate a heap dump on crash. By >> inspecting them I see the system consumer used a lot of memory: >> >> http://imgur.com/W7i0dn6 <http://imgur.com/W7i0dn6> >> <http://imgur.com/W7i0dn6 <http://imgur.com/W7i0dn6>> >> http://imgur.com/2CTphAk <http://imgur.com/2CTphAk> >> <http://imgur.com/2CTphAk <http://imgur.com/2CTphAk>> >> >> Why does the system consumer use so much memory? I noticed if I slow down >> the task (for example by skipping CPU-intensive computation) consumers don’t >> crash. Is that because there is much more time so that the garbage collector >> can reclaim unused memory? >> >> Regards >> >> Davide