Guozhang and Yan,

Thank you both for your responses.  I tried a lot of combinations and I
think I've determined that it's new producer + snappy that causes the issue.

It never happens with the old producer and it never happens with lz4 or no
compression.  It only happens when a broker gets restarted (or maybe just
shutdown).

The error is not always the same.  I've noticed at least three types of
errors on the Kafka brokers.

1) java.io.IOException: failed to read chunk
at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:356)
http://pastebin.com/NZrrEHxU
2) java.lang.OutOfMemoryError: Java heap space
   at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:346)
http://pastebin.com/yuxk1BjY
3) java.io.IOException: PARSING_ERROR(2)
  at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
http://pastebin.com/yq98Hx49

I've noticed a couple different behaviors from the Samza producer/job
A) It goes into a long retry loop where this message is logged.  I saw this
with error #1 above.

2015-04-29 18:17:31 Sender [WARN] task[Partition 7]
ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[9999253] Got
error produce response with correlation id 4878 on topic-partition
svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646 attempts
left). Error: CORRUPT_MESSAGE

B) The job exists with
org.apache.kafka.common.errors.UnknownServerException (at least when run as
ThreadJob).  I saw this with error #3 above.

org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 6 to system kafka.
org.apache.kafka.common.errors.UnknownServerException: The server
experienced an unexpected error when processing the request

This seems most likely to be a bug in the new Kafka producer.  I'll
probably file a JIRA for that project.

Thanks,

Roger

On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> And just to answer your first question: SIGTERM with
> controlled.shutdown=true should be OK for bouncing the broker.
>
> Guozhang
>
> On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Roger,
> >
> > I believe Samza 0.9.0 already uses the Java producer.
> >
> > Java producer's close() call will try to flush all buffered data to the
> > brokers before completing the call. However, if some buffered data's
> > destination partition leader is not known, the producer will block on
> > refreshing the metadata and then retry sending.
> >
> > From the broker logs, it seems it does receive the producer request but
> > failed to handle it due to "Leader not local" after the bounce:
> >
> > --------
> > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with
> > correlation id 226 from client
> > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on partition
> > [sys.samza_metrics,0] failed due to Leader not local for partition
> > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with
> > correlation id 45671 from client
> > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on
> > partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] failed
> > due to Leader not local for partition
> > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker 0
> > (kafka.server.KafkaApis)
> > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with
> > correlation id 12267 from client
> > samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition
> > [sys.samza_metrics,0] failed due to Leader not local for partition
> > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> > --------
> >
> > because for these two topic-partitions (sys.samza_metrics,0 and
> > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead has
> been
> > moved to broker id:1,host:sit320w80m7,port:9092. When the producer gets
> the
> > error code from the old leader, it should refresh its metadata and get
> the
> > new leader as broker-1, and retry sending, but for some reason it does
> not
> > refresh its metadata. Without producer logs from Samza container I cannot
> > further investigate the issue.
> >
> > Which Kafka version does Samza 0.9.0 use?
> >
> > Guozhang
> >
> > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang <yanfang...@gmail.com> wrote:
> >
> >> Not sure about the Kafka side. From the Samza side, from your
> >> description ( "does
> >> not exit nor does it make any progress" ), I think the code is stuck in
> >> producer.close
> >> <
> >>
> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143
> >> >,
> >> otherwise, it will throw SamzaException to quit the job. So maybe some
> >> Kafka experts in this mailing list or Kafka mailing list can help
> >>
> >> Fang, Yan
> >> yanfang...@gmail.com
> >>
> >> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover <roger.hoo...@gmail.com>
> >> wrote:
> >>
> >> > At error level logging, this was the only entry in the Samza log:
> >> >
> >> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
> >> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2] offset[9129395]
> >> > Unable to send message from TaskName-Partition 1 to system kafka
> >> >
> >> > Here is the log from the Kafka broker that was shutdown.
> >> >
> >> > http://pastebin.com/afgmLyNF
> >> >
> >> > Thanks,
> >> >
> >> > Roger
> >> >
> >> >
> >> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan <nickpa...@gmail.com> wrote:
> >> >
> >> > > Roger, could you paste the full log from Samza container? If you can
> >> > figure
> >> > > out which Kafka broker the message was sent to, it would be helpful
> >> if we
> >> > > get the log from the broker as well.
> >> > >
> >> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover <
> roger.hoo...@gmail.com
> >> >
> >> > > wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I need some help figuring out what's going on.
> >> > > >
> >> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All the topics
> >> have
> >> > > > replication factor of 2.
> >> > > >
> >> > > > I'm bouncing the Kafka broker using SIGTERM (with
> >> > > > controlled.shutdown.enable=true).  I see the Samza job log this
> >> message
> >> > > and
> >> > > > then hang (does not exit nor does it make any progress).
> >> > > >
> >> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
> >> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send message from
> >> > > > TaskName-Partition 1 to system kafka
> >> > > >
> >> > > > The Kafka consumer (Druid Real-Time node) on the other side then
> >> barfs
> >> > on
> >> > > > the message:
> >> > > >
> >> > > > Exception in thread "chief-svc-perf"
> >> > > kafka.message.InvalidMessageException:
> >> > > > Message is corrupt (stored crc = 1792882425, computed crc =
> >> 3898271689)
> >> > > > at kafka.message.Message.ensureValid(Message.scala:166)
> >> > > > at
> >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
> >> > > > at
> >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >> > > > at
> >> > >
> >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> >> > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106)
> >> > > > at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234)
> >> > > >
> >> > > > My questions are:
> >> > > > 1) What is the right way to bounce a Kafka broker?
> >> > > > 2) Is this a bug in Samza that the job hangs after producer
> request
> >> > > fails?
> >> > > > Has anyone seen this?
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Roger
> >> > > >
> >> > >
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to