I think this is probably a different error. I've also posted this to the Kafka list and I think it's due to bugs in the broker metadata update code in the new Java producer. So there are probably not any Samza issues from our side I think. Other than it appearing to be frozen as the producer is failing to discover the broker changes.
- Dan On Tue, 12 May 2015 at 17:04 Guozhang Wang <wangg...@gmail.com> wrote: > Hmm, that is a little wired since if the ip addresses are not changed then > the producer should be able to re-connect. I checked your logs again, and > only saw two ip addresses (10.0.0.40, 10.0.2.105), and the thrown > exceptions indicate that the producer cannot connect to them (I am assuming > the logs you attached are after the brokers have resumed). > > Guozhang > > On Mon, May 11, 2015 at 11:53 PM, Dan <danharve...@gmail.com> wrote: > > > The three ip addresses were the same as before, but in a different > mapping > > to id's. Does what you said still hold true in that case? > > > > We are fixing the ip and id mappings so that they will be fixed so this > > problem won't happen for us again. I know this scenario is not a expected > > case. > > > > - Dan > > > > > > On Tue, 12 May 2015 at 01:36 Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Hello Dan, > > > > > > I think your scenario is different with Roger / Garry's. For your case > > this > > > is actually the expected behavior since if all brokers are gone at > > roughly > > > the same time, and then re-appear with different ip address, then > > producer > > > cannot detect the "new" brokers via metadata refresh since there is no > > > transition period when some old broker can tell the producer about the > > new > > > brokers. In general, you should only do rolling bounce on kafka brokers > > for > > > upgrades / maintenance / etc. > > > > > > Guozhang > > > > > > On Fri, May 8, 2015 at 8:24 AM, Dan <danharve...@gmail.com> wrote: > > > > > > > Hi, > > > > > > > > I think we are seeing a similar error too where Samza is getting > stuck > > on > > > > producing to Kafka. This was on our sandbox environment and not > > > production, > > > > we had all Kafka instances (3) go away briefly then re-appear with > > > > different ip address -> broker mappings. I know this is not ideal and > > > we're > > > > fixing that separately, but I would have expected Samza/Producer to > > > > recover. > > > > > > > > What happened instead is that Samza (via the Kafka Producer client) > got > > > > stuck in a loop outputting "org.apache.kafka.common.network.Selector: > > > Error > > > > in I/O with <HOST>" > > > > > > > > So I think there might be a bug in the Kafka client where it fails to > > > > discover these new hosts? But Samza also does not seem to fail that > > > quickly > > > > when it is stuck. We had to manually restart the Samza task for this > to > > > be > > > > fixed. > > > > > > > > I've put a full stack trace > > > > https://gist.github.com/danharvey/3909d731f130a0fe1aad > > > > > > > > We also have a non-Samza Kafka consumer and that coped fine with the > > > > restart. We are using Kafka 0.8.2.1 and Samza 0.9.0. > > > > > > > > Thanks, > > > > Dan > > > > > > > > > > > > > > > > On Mon, 4 May 2015 at 14:37 Garry Turkington < > > > > g.turking...@improvedigital.com> wrote: > > > > > > > > > Hi Guozhang, > > > > > > > > > > Honestly snappy is so hugely more performant for me that I have > > rarely > > > > > used anything but it. So I've only seen the problem using snappy > but > > > > since > > > > > I use it all the time that may or may not be a valid data point. > > > > > > > > > > I did find some old logs from what happens client side when I see > > this > > > > > happen. First a series of communication errors along the lines of > > the > > > > > following which I think were due to a broker bouncing or timing > out: > > > > > > > > > > WARN Got error produce response with correlation id 331092 on > > > > > topic-partition <topic-name>-16, retrying (4 attempts left). Error: > > > > > NOT_LEADER_FOR_PARTITION 2015-04-15 13:54:13,890 > > > > > (org.apache.kafka.clients.producer.internals.Sender) > > > > > > > > > > But then the client dies with: > > > > > > > > > > java: target/snappy-1.1.1/snappy.cc:423: char* > > > > > snappy::internal::CompressFragment(const char*, size_t, char*, > > > > > snappy::uint16*, int): Assertion `0 == memcmp(base, candidate, > > > matched)' > > > > > failed. > > > > > > > > > > I'll try and get some better traces and post over on the kafka > list. > > > But > > > > > it'll be after Strata this week. > > > > > > > > > > Cheers > > > > > Garry > > > > > > > > > > > > > > > -----Original Message----- > > > > > From: Guozhang Wang [mailto:wangg...@gmail.com] > > > > > Sent: 04 May 2015 00:38 > > > > > To: dev@samza.apache.org > > > > > Subject: Re: Errors and hung job on broker shutdown > > > > > > > > > > Garry, > > > > > > > > > > Just wondering, does this error not exist with Gzip compression? Or > > you > > > > > could see it with any compression schemes? > > > > > > > > > > Guozhang > > > > > > > > > > On Sun, May 3, 2015 at 2:32 AM, Garry Turkington < > > > > > g.turking...@improvedigital.com> wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > Just to add another data point, I've been occasionally seeing the > > > > > > first error with a non-Samza app using the new Kafka producer > with > > > > > > Snappy compression. I was going to post to the Kafka list but I > > > > > > haven't really narrowed down the situations yet. It sort of looks > > > like > > > > > > it most often happens to me some minutes after a broker has > > restarted > > > > > > or had its ZK session time out in periods of very heavy load. > But I > > > > > > need do more troubleshooting to have something less vague to > > report > > > > > over there. > > > > > > > > > > > > Garry > > > > > > > > > > > > -----Original Message----- > > > > > > From: Guozhang Wang [mailto:wangg...@gmail.com] > > > > > > Sent: 01 May 2015 23:57 > > > > > > To: dev@samza.apache.org > > > > > > Subject: Re: Errors and hung job on broker shutdown > > > > > > > > > > > > Hmm, it seems your snappy compressed data is corrupted and hence > > keep > > > > > > getting rejected by the broker, hence keeping the producer > blocked > > on > > > > > > close(). Not sure how this happens as I have not seen this error > > ever > > > > > > before (myself wrote the new Kafka producer's compression module, > > and > > > > > > have ran it with various kinds of unit / integration test cases, > > but > > > > > > did not see this coming).. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Apr 29, 2015 at 11:37 PM, Roger Hoover > > > > > > <roger.hoo...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Guozhang and Yan, > > > > > > > > > > > > > > Thank you both for your responses. I tried a lot of > combinations > > > > > > > and I think I've determined that it's new producer + snappy > that > > > > > > > causes the issue. > > > > > > > > > > > > > > It never happens with the old producer and it never happens > with > > > lz4 > > > > > > > or no compression. It only happens when a broker gets > restarted > > > (or > > > > > > > maybe just shutdown). > > > > > > > > > > > > > > The error is not always the same. I've noticed at least three > > > types > > > > > > > of errors on the Kafka brokers. > > > > > > > > > > > > > > 1) java.io.IOException: failed to read chunk at > > > > > > > > > > > > > > > > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.j > > > > > > > av > > > > > > > a:356) > > > > > > > http://pastebin.com/NZrrEHxU > > > > > > > 2) java.lang.OutOfMemoryError: Java heap space > > > > > > > at > > > > > > > > > > > > > > > > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.j > > > > > > > av > > > > > > > a:346) > > > > > > > http://pastebin.com/yuxk1BjY > > > > > > > 3) java.io.IOException: PARSING_ERROR(2) > > > > > > > at > > > > > > > > org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) > > > > > > > http://pastebin.com/yq98Hx49 > > > > > > > > > > > > > > I've noticed a couple different behaviors from the Samza > > > > > > > producer/job > > > > > > > A) It goes into a long retry loop where this message is logged. > > I > > > > > > > saw this with error #1 above. > > > > > > > > > > > > > > 2015-04-29 18:17:31 Sender [WARN] task[Partition 7] > > > > > > > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] > > > > > > > offset[9999253] Got error produce response with correlation id > > 4878 > > > > > > > on topic-partition svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, > > > > > > > retrying (2147483646 attempts left). Error: CORRUPT_MESSAGE > > > > > > > > > > > > > > B) The job exists with > > > > > > > org.apache.kafka.common.errors.UnknownServerException (at least > > > when > > > > > > > run as ThreadJob). I saw this with error #3 above. > > > > > > > > > > > > > > org.apache.samza.SamzaException: Unable to send message from > > > > > > > TaskName-Partition 6 to system kafka. > > > > > > > org.apache.kafka.common.errors.UnknownServerException: The > server > > > > > > > experienced an unexpected error when processing the request > > > > > > > > > > > > > > This seems most likely to be a bug in the new Kafka producer. > > I'll > > > > > > > probably file a JIRA for that project. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Roger > > > > > > > > > > > > > > On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang < > > wangg...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > And just to answer your first question: SIGTERM with > > > > > > > > controlled.shutdown=true should be OK for bouncing the > broker. > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang > > > > > > > > <wangg...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Roger, > > > > > > > > > > > > > > > > > > I believe Samza 0.9.0 already uses the Java producer. > > > > > > > > > > > > > > > > > > Java producer's close() call will try to flush all buffered > > > data > > > > > > > > > to the brokers before completing the call. However, if some > > > > > > > > > buffered data's destination partition leader is not known, > > the > > > > > > > > > producer will block on refreshing the metadata and then > retry > > > > > > sending. > > > > > > > > > > > > > > > > > > From the broker logs, it seems it does receive the producer > > > > > > > > > request but failed to handle it due to "Leader not local" > > after > > > > > > > > > the > > > > > > bounce: > > > > > > > > > > > > > > > > > > -------- > > > > > > > > > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request > > > with > > > > > > > > > correlation id 226 from client > > > > > > > > > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 > on > > > > > > > > > partition [sys.samza_metrics,0] failed due to Leader not > > local > > > > > > > > > for partition [sys.samza_metrics,0] on broker 0 > > > > > > > > > (kafka.server.KafkaApis) > > > > > > > > > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request > > > with > > > > > > > > > correlation id 45671 from client > > > > > > > > > > > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 > > > > > > > > > on partition > > > > > > > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] > > > > > > > failed > > > > > > > > > due to Leader not local for partition > > > > > > > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on > > > > > > > > > broker > > > > > > > > > 0 > > > > > > > > > (kafka.server.KafkaApis) > > > > > > > > > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request > > > with > > > > > > > > > correlation id 12267 from client > > > > > > > > > samza_producer-svc_call_join_deploy-1-1429911471254-0 on > > > > > > > > > partition [sys.samza_metrics,0] failed due to Leader not > > local > > > > > > > > > for partition [sys.samza_metrics,0] on broker 0 > > > > > > > > > (kafka.server.KafkaApis) > > > > > > > > > -------- > > > > > > > > > > > > > > > > > > because for these two topic-partitions (sys.samza_metrics,0 > > and > > > > > > > > > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), > their > > > > > > > > > lead has > > > > > > > > been > > > > > > > > > moved to broker id:1,host:sit320w80m7,port:9092. When the > > > > > > > > > producer gets > > > > > > > > the > > > > > > > > > error code from the old leader, it should refresh its > > metadata > > > > > > > > > and get > > > > > > > > the > > > > > > > > > new leader as broker-1, and retry sending, but for some > > reason > > > > > > > > > it does > > > > > > > > not > > > > > > > > > refresh its metadata. Without producer logs from Samza > > > container > > > > > > > > > I > > > > > > > cannot > > > > > > > > > further investigate the issue. > > > > > > > > > > > > > > > > > > Which Kafka version does Samza 0.9.0 use? > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang < > > > yanfang...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > >> Not sure about the Kafka side. From the Samza side, from > > your > > > > > > > > >> description ( "does not exit nor does it make any > progress" > > ), > > > > > > > > >> I think the code is stuck > > > > > > > in > > > > > > > > >> producer.close > > > > > > > > >> < > > > > > > > > >> > > > > > > > > > > > > > > > > > > https://github.com/apache/samza/blob/master/samza-kafka/src/main/sca > > > > > > > la > > > > > > > /org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143 > > > > > > > > >> >, > > > > > > > > >> otherwise, it will throw SamzaException to quit the job. > So > > > > > > > > >> maybe some Kafka experts in this mailing list or Kafka > > mailing > > > > > > > > >> list can help > > > > > > > > >> > > > > > > > > >> Fang, Yan > > > > > > > > >> yanfang...@gmail.com > > > > > > > > >> > > > > > > > > >> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover > > > > > > > > >> <roger.hoo...@gmail.com > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > >> > At error level logging, this was the only entry in the > > Samza > > > > > log: > > > > > > > > >> > > > > > > > > > >> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] > > > > > > > > >> > task[Partition 2] > > > > > > > > >> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2] > > > > > > > offset[9129395] > > > > > > > > >> > Unable to send message from TaskName-Partition 1 to > system > > > > > > > > >> > kafka > > > > > > > > >> > > > > > > > > > >> > Here is the log from the Kafka broker that was shutdown. > > > > > > > > >> > > > > > > > > > >> > http://pastebin.com/afgmLyNF > > > > > > > > >> > > > > > > > > > >> > Thanks, > > > > > > > > >> > > > > > > > > > >> > Roger > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan < > > > nickpa...@gmail.com> > > > > > > > wrote: > > > > > > > > >> > > > > > > > > > >> > > Roger, could you paste the full log from Samza > > container? > > > > > > > > >> > > If you > > > > > > > can > > > > > > > > >> > figure > > > > > > > > >> > > out which Kafka broker the message was sent to, it > would > > > be > > > > > > > helpful > > > > > > > > >> if we > > > > > > > > >> > > get the log from the broker as well. > > > > > > > > >> > > > > > > > > > > >> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover < > > > > > > > > roger.hoo...@gmail.com > > > > > > > > >> > > > > > > > > > >> > > wrote: > > > > > > > > >> > > > > > > > > > > >> > > > Hi, > > > > > > > > >> > > > > > > > > > > > >> > > > I need some help figuring out what's going on. > > > > > > > > >> > > > > > > > > > > > >> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN. > > All > > > > > > > > >> > > > the > > > > > > > topics > > > > > > > > >> have > > > > > > > > >> > > > replication factor of 2. > > > > > > > > >> > > > > > > > > > > > >> > > > I'm bouncing the Kafka broker using SIGTERM (with > > > > > > > > >> > > > controlled.shutdown.enable=true). I see the Samza > job > > > > > > > > >> > > > log this > > > > > > > > >> message > > > > > > > > >> > > and > > > > > > > > >> > > > then hang (does not exit nor does it make any > > progress). > > > > > > > > >> > > > > > > > > > > > >> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] > > > > > > > > >> > > > task[Partition > > > > > > > 2] > > > > > > > > >> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send > > > > > > > > >> > > > message > > > > > > > from > > > > > > > > >> > > > TaskName-Partition 1 to system kafka > > > > > > > > >> > > > > > > > > > > > >> > > > The Kafka consumer (Druid Real-Time node) on the > other > > > > > > > > >> > > > side then > > > > > > > > >> barfs > > > > > > > > >> > on > > > > > > > > >> > > > the message: > > > > > > > > >> > > > > > > > > > > > >> > > > Exception in thread "chief-svc-perf" > > > > > > > > >> > > kafka.message.InvalidMessageException: > > > > > > > > >> > > > Message is corrupt (stored crc = 1792882425, > computed > > > crc > > > > > > > > >> > > > = > > > > > > > > >> 3898271689) > > > > > > > > >> > > > at > > kafka.message.Message.ensureValid(Message.scala:166) > > > > > > > > >> > > > at > > > > > > > > >> > > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala > > > > > > > > >> :1 > > > > > > > > >> 01) > > > > > > > > >> > > > at > > > > > > > > >> > > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala > > > > > > > > >> :3 > > > > > > > > >> 3) > > > > > > > > >> > > > at > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala > > > > > > > :6 > > > > > > > 6) > > > > > > > > >> > > > at > > > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > > > > > > > > >> > > > at > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEig > > > > > > > ht > > > > > > > FirehoseFactory.java:106) > > > > > > > > >> > > > at > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeMana > > > > > > > ge > > > > > > > r.java:234) > > > > > > > > >> > > > > > > > > > > > >> > > > My questions are: > > > > > > > > >> > > > 1) What is the right way to bounce a Kafka broker? > > > > > > > > >> > > > 2) Is this a bug in Samza that the job hangs after > > > > > > > > >> > > > producer > > > > > > > > request > > > > > > > > >> > > fails? > > > > > > > > >> > > > Has anyone seen this? > > > > > > > > >> > > > > > > > > > > > >> > > > Thanks, > > > > > > > > >> > > > > > > > > > > > >> > > > Roger > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >