Hey there, We are having a Samza job running in YARN which reads from Kafka and writes to Cassandra. We are also enabling the job to write the metrics to a Kafka topic, the changelogs for the RocksDB stores and checkpointing of offsets are also written to Kafka.
We are having failed containers in YARN for this job. The job is having a total of 4 containers but one container fails and the newly allocated containers by YARN keep failing. Here is the stack trace from one of the failed containers: 2016-11-18 22:01:10,930 62058 [main] ERROR o.a.s.s.kafka.KafkaSystemProducer - Unable to send message from TaskName-Partition 6 to system kafka 2016-11-18 22:01:10,943 62071 [main] ERROR o.a.samza.container.SamzaContainer - Caught exception in process loop. org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 6 to system kafka. at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:152) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39) at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:136) at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64) at org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:70) at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:182) at org.apache.samza.container.RunLoop$$anonfun$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162) at org.apache.samza.container.RunLoop$$anonfun$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.RunLoop$$anonfun$commit$1.apply$mcVJ$sp(RunLoop.scala:162) at org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51) at org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:35) at org.apache.samza.container.RunLoop.commit(RunLoop.scala:157) at org.apache.samza.container.RunLoop.run(RunLoop.scala:76) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept. We tried turning off the metrics to be written to Kafka but still having the problem. We enabled the 'trace' level debugging for the 'org.apache.samza' package and found following additional info: 2016-11-18 22:01:10,323 61451 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig values: compression.type = none metric.reporters = [] metadata.max.age.ms = 300000 metadata.fetch.timeout.ms = 60000 acks = all batch.size = 16384 reconnect.backoff.ms = 10 bootstrap.servers = [usw2a-daalt-an-kaf-int5.prsn.us:9092, usw2b-daalt-an-kaf-int5.prsn.us:9092, usw2c-daalt-an-kaf-int5.prsn.us:9092] receive.buffer.bytes = 32768 retry.backoff.ms = 100 buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer retries = 2147483647 max.request.size = 1048576 block.on.buffer.full = true value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer metrics.sample.window.ms = 30000 send.buffer.bytes = 131072 max.in.flight.requests.per.connection = 1 metrics.num.samples = 2 linger.ms = 0 client.id = samza_checkpoint_manager-course_section_analytics-1-1479506410048-6 Which is compelling us to believe that it is still producing the metrics in spite of us disabling the metrics writing to Kafka. Any help from you guys would be much appreciated. Have a nice weekend! Thanks, Suraj Choudhary