Hi Suraj, A couple of questions:
1. Do you write your output to kafka at all? (For example, is it possible for a message you write to an output topic to be more than 1M?) 2. Is it possible that an entry written to the changelog is more than 1M? (Possibly, because you had a > 1M write to the local rocksdb, that ended up writing the message to the changelog). You can re-produce this easily (if this is the cause). 3. Just to confirm, you can explicitly disable metrics reporters by the `metrics.reporters` property. Also, remove all configs that refer to org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory. On Fri, Nov 18, 2016 at 2:53 PM, Suraj choudhary <surajc0...@gmail.com> wrote: > Hey there, > > We are having a Samza job running in YARN which reads from Kafka and writes > to Cassandra. We are also enabling the job to write the metrics to a Kafka > topic, the changelogs for the RocksDB stores and checkpointing of offsets > are also written to Kafka. > > We are having failed containers in YARN for this job. The job is having a > total of 4 containers but one container fails and the newly allocated > containers by YARN keep failing. > > Here is the stack trace from one of the failed containers: > > 2016-11-18 22:01:10,930 62058 [main] ERROR > o.a.s.s.kafka.KafkaSystemProducer - Unable to send message from > TaskName-Partition 6 to system kafka > 2016-11-18 22:01:10,943 62071 [main] ERROR > o.a.samza.container.SamzaContainer - Caught exception in process loop. > org.apache.samza.SamzaException: Unable to send message from > TaskName-Partition 6 to system kafka. > at > org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$ > flush$1.apply$mcV$sp(KafkaSystemProducer.scala:152) > at > org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply( > KafkaSystemProducer.scala:136) > at > org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply( > KafkaSystemProducer.scala:136) > at > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > at > org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer( > KafkaSystemProducer.scala:39) > at > org.apache.samza.system.kafka.KafkaSystemProducer.flush( > KafkaSystemProducer.scala:136) > at > org.apache.samza.system.SystemProducers$$anonfun$ > flush$2.apply(SystemProducers.scala:64) > at > org.apache.samza.system.SystemProducers$$anonfun$ > flush$2.apply(SystemProducers.scala:64) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64) > at > org.apache.samza.task.TaskInstanceCollector.flush( > TaskInstanceCollector.scala:70) > at > org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:182) > at > org.apache.samza.container.RunLoop$$anonfun$commit$1$$ > anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162) > at > org.apache.samza.container.RunLoop$$anonfun$commit$1$$ > anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > org.apache.samza.container.RunLoop$$anonfun$commit$1. > apply$mcVJ$sp(RunLoop.scala:162) > at > org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration( > TimerUtils.scala:51) > at > org.apache.samza.container.RunLoop.updateTimerAndGetDuration( > RunLoop.scala:35) > at org.apache.samza.container.RunLoop.commit(RunLoop.scala:157) > at org.apache.samza.container.RunLoop.run(RunLoop.scala:76) > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553) > at > org.apache.samza.container.SamzaContainer$.safeMain( > SamzaContainer.scala:92) > at > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66) > at > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The > request included a message larger than the max message size the server will > accept. > > We tried turning off the metrics to be written to Kafka but still having > the problem. We enabled the 'trace' level debugging for the > 'org.apache.samza' package and found following additional info: > > 2016-11-18 22:01:10,323 61451 [main] INFO o.a.k.c.producer.ProducerConfig > - ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > acks = all > batch.size = 16384 > reconnect.backoff.ms = 10 > bootstrap.servers = [usw2a-daalt-an-kaf-int5.prsn.us:9092, > usw2b-daalt-an-kaf-int5.prsn.us:9092, usw2c-daalt-an-kaf-int5.prsn.us:9092 > ] > receive.buffer.bytes = 32768 > retry.backoff.ms = 100 > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > retries = 2147483647 > max.request.size = 1048576 > block.on.buffer.full = true > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > metrics.sample.window.ms = 30000 > send.buffer.bytes = 131072 > max.in.flight.requests.per.connection = 1 > metrics.num.samples = 2 > linger.ms = 0 > client.id = > samza_checkpoint_manager-course_section_analytics-1-1479506410048-6 > > > Which is compelling us to believe that it is still producing the metrics in > spite of us disabling the metrics writing to Kafka. > > Any help from you guys would be much appreciated. Have a nice weekend! > > > Thanks, > Suraj Choudhary > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University