Hello,

I have memory issues when running a task on the cluster. The Samza application 
performs pattern matching on received messages and sends results to another 
Kafka topic.
I tried to put some pressure on the system by sending messages with an average 
size of 12KiB. After a while containers crash because of no heap memory left or 
java.lang.OutOfMemoryError: GC overhead limit exceeded.
For example:

java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[na:1.8.0_45]
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_45]
        at 
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
 ~[kafka_2.10-0.8.2.0.jar:na]
        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
 ~[kafka_2.10-0.8.2.0.jar:na]
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56) 
~[kafka_2.10-0.8.2.0.jar:na]
        at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 ~[kafka_2.10-0.8.2.0.jar:na]
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) 
~[kafka_2.10-0.8.2.0.jar:na]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) 
~[kafka_2.10-0.8.2.0.jar:na]
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
 ~[kafka_2.10-0.8.2.0.jar:na]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
 ~[kafka_2.10-0.8.2.0.jar:na]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
 ~[kafka_2.10-0.8.2.0.jar:na]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
 ~[kafka_2.10-0.8.2.0.jar:na]
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
~[kafka_2.10-0.8.2.0.jar:na]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
 ~[kafka_2.10-0.8.2.0.jar:na]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) 
~[kafka_2.10-0.8.2.0.jar:na]
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) 
~[kafka_2.10-0.8.2.0.jar:na]
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
~[kafka_2.10-0.8.2.0.jar:na]
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) 
~[kafka_2.10-0.8.2.0.jar:na]
        at 
org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:48)
 ~[samza-kafka_2.10-0.9.0.jar:na]
        at 
org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:41)
 ~[samza-kafka_2.10-0.9.0.jar:na]
        at 
org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:176)
 ~[samza-kafka_2.10-0.9.0.jar:na]
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:146)
 ~[samza-kafka_2.10-0.9.0.jar:na]
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:133)
 ~[samza-kafka_2.10-0.9.0.jar:na]
        at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
 ~[samza-core_2.10-0.9.0.jar:0.9.0]
        at 
org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:132) 
~[samza-kafka_2.10-0.9.0.jar:na]
        at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_45]

java.lang.OutOfMemoryError: Java heap space
        at kafka.utils.Utils$.readBytes(Utils.scala:127) 
~[kafka_2.10-0.8.2.0.jar:na]
        at kafka.utils.Utils$.readBytes(Utils.scala:121) 
~[kafka_2.10-0.8.2.0.jar:na]


So I instructed each container to generate a heap dump on crash. By inspecting 
them I see the system consumer used a lot of memory:

http://imgur.com/W7i0dn6 <http://imgur.com/W7i0dn6>
http://imgur.com/2CTphAk <http://imgur.com/2CTphAk>

Why does the system consumer use so much memory? I noticed if I slow down the 
task (for example by skipping CPU-intensive computation) consumers don’t crash. 
Is that because there is much more time so that the garbage collector can 
reclaim unused memory?

Regards

Davide

Reply via email to