Hi Jagadish,

Thanks for your response.

*Question 1*:  Do you write your output to kafka at all? (For example, is
it possible for a message you write to an output topic to be more than 1M?)

Yes, we write changelogs for RocksDB stores, metrics for monitoring purpose
and offset checkpoints to different Kafka topics.

*Question 2*: Is it possible that an entry written to the changelog is more
than 1M? (Possibly, because you had a > 1M write to the local rocksdb, that
ended up writing the message to the changelog). You can re-produce this
easily (if this is the cause).

We disabled the change logs for all the RocksDB store and the containers
did not fail. So it seems our changelogs are bigger then 1MB. What do you
recommend in such situation?

*Question 3*: Just to confirm, you can explicitly disable metrics reporters
by the `metrics.reporters` property. Also, remove all configs that refer to
org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory.

Yes, we commented out all the properties related to metrics to eliminate
the possibility that metrics is not writing anything > 1MB.

Have a great day!

Thanks,
Suraj Choudhary





Regards,
Suraj Choudhary

On Fri, Nov 18, 2016 at 8:03 PM, Jagadish Venkatraman <
jagadish1...@gmail.com> wrote:

> Hi Suraj,
>
> A couple of questions:
>
> 1. Do you write your output to kafka at all? (For example, is it possible
> for a message you write to an output topic to be more than 1M?)
> 2. Is it possible that an entry written to the changelog is more than 1M?
> (Possibly, because you had a > 1M write to the local rocksdb, that ended up
> writing the message to the changelog). You can re-produce this easily (if
> this is the cause).
> 3. Just to confirm, you can explicitly disable metrics reporters by the
> `metrics.reporters` property. Also, remove all configs that refer to
> org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory.
>
>
>
>
> On Fri, Nov 18, 2016 at 2:53 PM, Suraj choudhary <surajc0...@gmail.com>
> wrote:
>
> > Hey there,
> >
> > We are having a Samza job running in YARN which reads from Kafka and
> writes
> > to Cassandra. We are also enabling the job to write the metrics to a
> Kafka
> > topic, the changelogs for the RocksDB stores  and checkpointing of
> offsets
> > are also written to Kafka.
> >
> > We are having failed containers in YARN for this job. The job is having a
> > total of 4 containers but one container fails and the newly allocated
> > containers by YARN keep failing.
> >
> > Here is the stack trace from one of the failed containers:
> >
> > 2016-11-18 22:01:10,930 62058 [main] ERROR
> > o.a.s.s.kafka.KafkaSystemProducer - Unable to send message from
> > TaskName-Partition 6 to system kafka
> >  2016-11-18 22:01:10,943 62071 [main] ERROR
> > o.a.samza.container.SamzaContainer - Caught exception in process loop.
> >  org.apache.samza.SamzaException: Unable to send message from
> > TaskName-Partition 6 to system kafka.
> >         at
> > org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$
> > flush$1.apply$mcV$sp(KafkaSystemProducer.scala:152)
> >         at
> > org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$
> flush$1.apply(
> > KafkaSystemProducer.scala:136)
> >         at
> > org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$
> flush$1.apply(
> > KafkaSystemProducer.scala:136)
> >         at
> > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> >         at
> > org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(
> > KafkaSystemProducer.scala:39)
> >         at
> > org.apache.samza.system.kafka.KafkaSystemProducer.flush(
> > KafkaSystemProducer.scala:136)
> >         at
> > org.apache.samza.system.SystemProducers$$anonfun$
> > flush$2.apply(SystemProducers.scala:64)
> >         at
> > org.apache.samza.system.SystemProducers$$anonfun$
> > flush$2.apply(SystemProducers.scala:64)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(
> Iterator.scala:1157)
> >         at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(
> MapLike.scala:206)
> >         at
> > org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
> >         at
> > org.apache.samza.task.TaskInstanceCollector.flush(
> > TaskInstanceCollector.scala:70)
> >         at
> > org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:182)
> >         at
> > org.apache.samza.container.RunLoop$$anonfun$commit$1$$
> > anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162)
> >         at
> > org.apache.samza.container.RunLoop$$anonfun$commit$1$$
> > anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(
> Iterator.scala:1157)
> >         at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(
> MapLike.scala:206)
> >         at
> > org.apache.samza.container.RunLoop$$anonfun$commit$1.
> > apply$mcVJ$sp(RunLoop.scala:162)
> >         at
> > org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(
> > TimerUtils.scala:51)
> >         at
> > org.apache.samza.container.RunLoop.updateTimerAndGetDuration(
> > RunLoop.scala:35)
> >         at org.apache.samza.container.RunLoop.commit(RunLoop.scala:157)
> >         at org.apache.samza.container.RunLoop.run(RunLoop.scala:76)
> >         at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
> >         at
> > org.apache.samza.container.SamzaContainer$.safeMain(
> > SamzaContainer.scala:92)
> >         at
> > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
> >         at
> > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
> > request included a message larger than the max message size the server
> will
> > accept.
> >
> > We tried turning off the metrics to be written to Kafka but still having
> > the problem. We enabled the 'trace' level debugging for the
> > 'org.apache.samza' package and found following additional info:
> >
> > 2016-11-18 22:01:10,323 61451 [main] INFO  o.a.k.c.producer.
> ProducerConfig
> > - ProducerConfig values:
> >         compression.type = none
> >         metric.reporters = []
> >         metadata.max.age.ms = 300000
> >         metadata.fetch.timeout.ms = 60000
> >         acks = all
> >         batch.size = 16384
> >         reconnect.backoff.ms = 10
> >         bootstrap.servers = [usw2a-daalt-an-kaf-int5.prsn.us:9092,
> > usw2b-daalt-an-kaf-int5.prsn.us:9092, usw2c-daalt-an-kaf-int5.prsn.
> us:9092
> > ]
> >         receive.buffer.bytes = 32768
> >         retry.backoff.ms = 100
> >         buffer.memory = 33554432
> >         timeout.ms = 30000
> >         key.serializer = class
> > org.apache.kafka.common.serialization.ByteArraySerializer
> >         retries = 2147483647
> >         max.request.size = 1048576
> >         block.on.buffer.full = true
> >         value.serializer = class
> > org.apache.kafka.common.serialization.ByteArraySerializer
> >         metrics.sample.window.ms = 30000
> >         send.buffer.bytes = 131072
> >         max.in.flight.requests.per.connection = 1
> >         metrics.num.samples = 2
> >         linger.ms = 0
> >         client.id =
> > samza_checkpoint_manager-course_section_analytics-1-1479506410048-6
> >
> >
> > Which is compelling us to believe that it is still producing the metrics
> in
> > spite of us disabling the metrics writing to Kafka.
> >
> > Any help from you guys would be much appreciated. Have a nice weekend!
> >
> >
> > Thanks,
> > Suraj Choudhary
> >
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>

Reply via email to