I think this is probably a different error. I've also posted this to the
Kafka list and I think it's due to bugs in the broker metadata update code
in the new Java producer. So there are probably not any Samza issues from
our side I think. Other than it appearing to be frozen as the producer is
failing to discover the broker changes.

 - Dan


On Tue, 12 May 2015 at 17:04 Guozhang Wang <wangg...@gmail.com> wrote:

> Hmm, that is a little wired since if the ip addresses are not changed then
> the producer should be able to re-connect. I checked your logs again, and
> only saw two ip addresses (10.0.0.40, 10.0.2.105), and the thrown
> exceptions indicate that the producer cannot connect to them (I am assuming
> the logs you attached are after the brokers have resumed).
>
> Guozhang
>
> On Mon, May 11, 2015 at 11:53 PM, Dan <danharve...@gmail.com> wrote:
>
> > The three ip addresses were the same as before, but in a different
> mapping
> > to id's. Does what you said still hold true in that case?
> >
> > We are fixing the ip and id mappings so that they will be fixed so this
> > problem won't happen for us again. I know this scenario is not a expected
> > case.
> >
> > - Dan
> >
> >
> > On Tue, 12 May 2015 at 01:36 Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > Hello Dan,
> > >
> > > I think your scenario is different with Roger / Garry's. For your case
> > this
> > > is actually the expected behavior since if all brokers are gone at
> > roughly
> > > the same time, and then re-appear with different ip address, then
> > producer
> > > cannot detect the "new" brokers via metadata refresh since there is no
> > > transition period when some old broker can tell the producer about the
> > new
> > > brokers. In general, you should only do rolling bounce on kafka brokers
> > for
> > > upgrades / maintenance / etc.
> > >
> > > Guozhang
> > >
> > > On Fri, May 8, 2015 at 8:24 AM, Dan <danharve...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I think we are seeing a similar error too where Samza is getting
> stuck
> > on
> > > > producing to Kafka. This was on our sandbox environment and not
> > > production,
> > > > we had all Kafka instances (3) go away briefly then re-appear with
> > > > different ip address -> broker mappings. I know this is not ideal and
> > > we're
> > > > fixing that separately, but I would have expected Samza/Producer to
> > > > recover.
> > > >
> > > > What happened instead is that Samza (via the Kafka Producer client)
> got
> > > > stuck in a loop outputting "org.apache.kafka.common.network.Selector:
> > > Error
> > > > in I/O with <HOST>"
> > > >
> > > > So I think there might be a bug in the Kafka client where it fails to
> > > > discover these new hosts? But Samza also does not seem to fail that
> > > quickly
> > > > when it is stuck. We had to manually restart the Samza task for this
> to
> > > be
> > > > fixed.
> > > >
> > > > I've put a full stack trace
> > > > https://gist.github.com/danharvey/3909d731f130a0fe1aad
> > > >
> > > > We also have a non-Samza Kafka consumer and that coped fine with the
> > > > restart. We are using Kafka 0.8.2.1 and Samza 0.9.0.
> > > >
> > > > Thanks,
> > > > Dan
> > > >
> > > >
> > > >
> > > > On Mon, 4 May 2015 at 14:37 Garry Turkington <
> > > > g.turking...@improvedigital.com> wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > Honestly snappy is so hugely more performant for me that I have
> > rarely
> > > > > used anything but it. So I've only seen the problem using snappy
> but
> > > > since
> > > > > I use it all the time that may or may not be a valid data point.
> > > > >
> > > > > I did find some old logs from what happens client side when I see
> > this
> > > > > happen.  First a series of communication errors along the lines of
> > the
> > > > > following which I think were due to a broker bouncing or timing
> out:
> > > > >
> > > > > WARN Got error produce response with correlation id 331092 on
> > > > > topic-partition <topic-name>-16, retrying (4 attempts left). Error:
> > > > > NOT_LEADER_FOR_PARTITION 2015-04-15 13:54:13,890
> > > > > (org.apache.kafka.clients.producer.internals.Sender)
> > > > >
> > > > > But then the client dies with:
> > > > >
> > > > > java: target/snappy-1.1.1/snappy.cc:423: char*
> > > > > snappy::internal::CompressFragment(const char*, size_t, char*,
> > > > > snappy::uint16*, int): Assertion `0 == memcmp(base, candidate,
> > > matched)'
> > > > > failed.
> > > > >
> > > > > I'll try and get some better traces and post over on the kafka
> list.
> > > But
> > > > > it'll be after Strata this week.
> > > > >
> > > > > Cheers
> > > > > Garry
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > > > > Sent: 04 May 2015 00:38
> > > > > To: dev@samza.apache.org
> > > > > Subject: Re: Errors and hung job on broker shutdown
> > > > >
> > > > > Garry,
> > > > >
> > > > > Just wondering, does this error not exist with Gzip compression? Or
> > you
> > > > > could see it with any compression schemes?
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Sun, May 3, 2015 at 2:32 AM, Garry Turkington <
> > > > > g.turking...@improvedigital.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Just to add another data point, I've been occasionally seeing the
> > > > > > first error with a non-Samza app using the new Kafka producer
> with
> > > > > > Snappy compression. I was going to post to the Kafka list but I
> > > > > > haven't really narrowed down the situations yet. It sort of looks
> > > like
> > > > > > it most often happens to me some minutes after a broker has
> > restarted
> > > > > > or had its ZK session time out in periods of very heavy load.
> But I
> > > > > > need do more troubleshooting to have something less  vague to
> > report
> > > > > over there.
> > > > > >
> > > > > > Garry
> > > > > >
> > > > > > -----Original Message-----
> > > > > > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > > > > > Sent: 01 May 2015 23:57
> > > > > > To: dev@samza.apache.org
> > > > > > Subject: Re: Errors and hung job on broker shutdown
> > > > > >
> > > > > > Hmm, it seems your snappy compressed data is corrupted and hence
> > keep
> > > > > > getting rejected by the broker, hence keeping the producer
> blocked
> > on
> > > > > > close(). Not sure how this happens as I have not seen this error
> > ever
> > > > > > before (myself wrote the new Kafka producer's compression module,
> > and
> > > > > > have ran it with various kinds of unit / integration test cases,
> > but
> > > > > > did not see this coming)..
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Wed, Apr 29, 2015 at 11:37 PM, Roger Hoover
> > > > > > <roger.hoo...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Guozhang and Yan,
> > > > > > >
> > > > > > > Thank you both for your responses.  I tried a lot of
> combinations
> > > > > > > and I think I've determined that it's new producer + snappy
> that
> > > > > > > causes the issue.
> > > > > > >
> > > > > > > It never happens with the old producer and it never happens
> with
> > > lz4
> > > > > > > or no compression.  It only happens when a broker gets
> restarted
> > > (or
> > > > > > > maybe just shutdown).
> > > > > > >
> > > > > > > The error is not always the same.  I've noticed at least three
> > > types
> > > > > > > of errors on the Kafka brokers.
> > > > > > >
> > > > > > > 1) java.io.IOException: failed to read chunk at
> > > > > > >
> > > > > > >
> > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.j
> > > > > > > av
> > > > > > > a:356)
> > > > > > > http://pastebin.com/NZrrEHxU
> > > > > > > 2) java.lang.OutOfMemoryError: Java heap space
> > > > > > >    at
> > > > > > >
> > > > > > >
> > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.j
> > > > > > > av
> > > > > > > a:346)
> > > > > > > http://pastebin.com/yuxk1BjY
> > > > > > > 3) java.io.IOException: PARSING_ERROR(2)
> > > > > > >   at
> > > > > > >
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> > > > > > > http://pastebin.com/yq98Hx49
> > > > > > >
> > > > > > > I've noticed a couple different behaviors from the Samza
> > > > > > > producer/job
> > > > > > > A) It goes into a long retry loop where this message is logged.
> > I
> > > > > > > saw this with error #1 above.
> > > > > > >
> > > > > > > 2015-04-29 18:17:31 Sender [WARN] task[Partition 7]
> > > > > > > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7]
> > > > > > > offset[9999253] Got error produce response with correlation id
> > 4878
> > > > > > > on topic-partition svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1,
> > > > > > > retrying (2147483646 attempts left). Error: CORRUPT_MESSAGE
> > > > > > >
> > > > > > > B) The job exists with
> > > > > > > org.apache.kafka.common.errors.UnknownServerException (at least
> > > when
> > > > > > > run as ThreadJob).  I saw this with error #3 above.
> > > > > > >
> > > > > > > org.apache.samza.SamzaException: Unable to send message from
> > > > > > > TaskName-Partition 6 to system kafka.
> > > > > > > org.apache.kafka.common.errors.UnknownServerException: The
> server
> > > > > > > experienced an unexpected error when processing the request
> > > > > > >
> > > > > > > This seems most likely to be a bug in the new Kafka producer.
> > I'll
> > > > > > > probably file a JIRA for that project.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Roger
> > > > > > >
> > > > > > > On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > And just to answer your first question: SIGTERM with
> > > > > > > > controlled.shutdown=true should be OK for bouncing the
> broker.
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > > On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang
> > > > > > > > <wangg...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Roger,
> > > > > > > > >
> > > > > > > > > I believe Samza 0.9.0 already uses the Java producer.
> > > > > > > > >
> > > > > > > > > Java producer's close() call will try to flush all buffered
> > > data
> > > > > > > > > to the brokers before completing the call. However, if some
> > > > > > > > > buffered data's destination partition leader is not known,
> > the
> > > > > > > > > producer will block on refreshing the metadata and then
> retry
> > > > > > sending.
> > > > > > > > >
> > > > > > > > > From the broker logs, it seems it does receive the producer
> > > > > > > > > request but failed to handle it due to "Leader not local"
> > after
> > > > > > > > > the
> > > > > > bounce:
> > > > > > > > >
> > > > > > > > > --------
> > > > > > > > > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request
> > > with
> > > > > > > > > correlation id 226 from client
> > > > > > > > > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3
> on
> > > > > > > > > partition [sys.samza_metrics,0] failed due to Leader not
> > local
> > > > > > > > > for partition [sys.samza_metrics,0] on broker 0
> > > > > > > > > (kafka.server.KafkaApis)
> > > > > > > > > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request
> > > with
> > > > > > > > > correlation id 45671 from client
> > > > > > > > >
> > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4
> > > > > > > > > on partition
> > > > > > > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0]
> > > > > > > failed
> > > > > > > > > due to Leader not local for partition
> > > > > > > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on
> > > > > > > > > broker
> > > > > > > > > 0
> > > > > > > > > (kafka.server.KafkaApis)
> > > > > > > > > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request
> > > with
> > > > > > > > > correlation id 12267 from client
> > > > > > > > > samza_producer-svc_call_join_deploy-1-1429911471254-0 on
> > > > > > > > > partition [sys.samza_metrics,0] failed due to Leader not
> > local
> > > > > > > > > for partition [sys.samza_metrics,0] on broker 0
> > > > > > > > > (kafka.server.KafkaApis)
> > > > > > > > > --------
> > > > > > > > >
> > > > > > > > > because for these two topic-partitions (sys.samza_metrics,0
> > and
> > > > > > > > > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0),
> their
> > > > > > > > > lead has
> > > > > > > > been
> > > > > > > > > moved to broker id:1,host:sit320w80m7,port:9092. When the
> > > > > > > > > producer gets
> > > > > > > > the
> > > > > > > > > error code from the old leader, it should refresh its
> > metadata
> > > > > > > > > and get
> > > > > > > > the
> > > > > > > > > new leader as broker-1, and retry sending, but for some
> > reason
> > > > > > > > > it does
> > > > > > > > not
> > > > > > > > > refresh its metadata. Without producer logs from Samza
> > > container
> > > > > > > > > I
> > > > > > > cannot
> > > > > > > > > further investigate the issue.
> > > > > > > > >
> > > > > > > > > Which Kafka version does Samza 0.9.0 use?
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang <
> > > yanfang...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Not sure about the Kafka side. From the Samza side, from
> > your
> > > > > > > > >> description ( "does not exit nor does it make any
> progress"
> > ),
> > > > > > > > >> I think the code is stuck
> > > > > > > in
> > > > > > > > >> producer.close
> > > > > > > > >> <
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > https://github.com/apache/samza/blob/master/samza-kafka/src/main/sca
> > > > > > > la
> > > > > > > /org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143
> > > > > > > > >> >,
> > > > > > > > >> otherwise, it will throw SamzaException to quit the job.
> So
> > > > > > > > >> maybe some Kafka experts in this mailing list or Kafka
> > mailing
> > > > > > > > >> list can help
> > > > > > > > >>
> > > > > > > > >> Fang, Yan
> > > > > > > > >> yanfang...@gmail.com
> > > > > > > > >>
> > > > > > > > >> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover
> > > > > > > > >> <roger.hoo...@gmail.com
> > > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> > At error level logging, this was the only entry in the
> > Samza
> > > > > log:
> > > > > > > > >> >
> > > > > > > > >> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR]
> > > > > > > > >> > task[Partition 2]
> > > > > > > > >> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2]
> > > > > > > offset[9129395]
> > > > > > > > >> > Unable to send message from TaskName-Partition 1 to
> system
> > > > > > > > >> > kafka
> > > > > > > > >> >
> > > > > > > > >> > Here is the log from the Kafka broker that was shutdown.
> > > > > > > > >> >
> > > > > > > > >> > http://pastebin.com/afgmLyNF
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> >
> > > > > > > > >> > Roger
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan <
> > > nickpa...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Roger, could you paste the full log from Samza
> > container?
> > > > > > > > >> > > If you
> > > > > > > can
> > > > > > > > >> > figure
> > > > > > > > >> > > out which Kafka broker the message was sent to, it
> would
> > > be
> > > > > > > helpful
> > > > > > > > >> if we
> > > > > > > > >> > > get the log from the broker as well.
> > > > > > > > >> > >
> > > > > > > > >> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover <
> > > > > > > > roger.hoo...@gmail.com
> > > > > > > > >> >
> > > > > > > > >> > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > Hi,
> > > > > > > > >> > > >
> > > > > > > > >> > > > I need some help figuring out what's going on.
> > > > > > > > >> > > >
> > > > > > > > >> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.
> > All
> > > > > > > > >> > > > the
> > > > > > > topics
> > > > > > > > >> have
> > > > > > > > >> > > > replication factor of 2.
> > > > > > > > >> > > >
> > > > > > > > >> > > > I'm bouncing the Kafka broker using SIGTERM (with
> > > > > > > > >> > > > controlled.shutdown.enable=true).  I see the Samza
> job
> > > > > > > > >> > > > log this
> > > > > > > > >> message
> > > > > > > > >> > > and
> > > > > > > > >> > > > then hang (does not exit nor does it make any
> > progress).
> > > > > > > > >> > > >
> > > > > > > > >> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR]
> > > > > > > > >> > > > task[Partition
> > > > > > > 2]
> > > > > > > > >> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send
> > > > > > > > >> > > > message
> > > > > > > from
> > > > > > > > >> > > > TaskName-Partition 1 to system kafka
> > > > > > > > >> > > >
> > > > > > > > >> > > > The Kafka consumer (Druid Real-Time node) on the
> other
> > > > > > > > >> > > > side then
> > > > > > > > >> barfs
> > > > > > > > >> > on
> > > > > > > > >> > > > the message:
> > > > > > > > >> > > >
> > > > > > > > >> > > > Exception in thread "chief-svc-perf"
> > > > > > > > >> > > kafka.message.InvalidMessageException:
> > > > > > > > >> > > > Message is corrupt (stored crc = 1792882425,
> computed
> > > crc
> > > > > > > > >> > > > =
> > > > > > > > >> 3898271689)
> > > > > > > > >> > > > at
> > kafka.message.Message.ensureValid(Message.scala:166)
> > > > > > > > >> > > > at
> > > > > > > > >>
> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala
> > > > > > > > >> :1
> > > > > > > > >> 01)
> > > > > > > > >> > > > at
> > > > > > > > >>
> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala
> > > > > > > > >> :3
> > > > > > > > >> 3)
> > > > > > > > >> > > > at
> > > > > > > > >> > >
> > > > > > > > >>
> > > > > > >
> > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala
> > > > > > > :6
> > > > > > > 6)
> > > > > > > > >> > > > at
> > > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > > > > > > > >> > > > at
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEig
> > > > > > > ht
> > > > > > > FirehoseFactory.java:106)
> > > > > > > > >> > > > at
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeMana
> > > > > > > ge
> > > > > > > r.java:234)
> > > > > > > > >> > > >
> > > > > > > > >> > > > My questions are:
> > > > > > > > >> > > > 1) What is the right way to bounce a Kafka broker?
> > > > > > > > >> > > > 2) Is this a bug in Samza that the job hangs after
> > > > > > > > >> > > > producer
> > > > > > > > request
> > > > > > > > >> > > fails?
> > > > > > > > >> > > > Has anyone seen this?
> > > > > > > > >> > > >
> > > > > > > > >> > > > Thanks,
> > > > > > > > >> > > >
> > > > > > > > >> > > > Roger
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to