And just to answer your first question: SIGTERM with
controlled.shutdown=true should be OK for bouncing the broker.

Guozhang

On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Roger,
>
> I believe Samza 0.9.0 already uses the Java producer.
>
> Java producer's close() call will try to flush all buffered data to the
> brokers before completing the call. However, if some buffered data's
> destination partition leader is not known, the producer will block on
> refreshing the metadata and then retry sending.
>
> From the broker logs, it seems it does receive the producer request but
> failed to handle it due to "Leader not local" after the bounce:
>
> --------
> [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with
> correlation id 226 from client
> samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on partition
> [sys.samza_metrics,0] failed due to Leader not local for partition
> [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with
> correlation id 45671 from client
> samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on
> partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] failed
> due to Leader not local for partition
> [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker 0
> (kafka.server.KafkaApis)
> [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with
> correlation id 12267 from client
> samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition
> [sys.samza_metrics,0] failed due to Leader not local for partition
> [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> --------
>
> because for these two topic-partitions (sys.samza_metrics,0 and
> __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead has been
> moved to broker id:1,host:sit320w80m7,port:9092. When the producer gets the
> error code from the old leader, it should refresh its metadata and get the
> new leader as broker-1, and retry sending, but for some reason it does not
> refresh its metadata. Without producer logs from Samza container I cannot
> further investigate the issue.
>
> Which Kafka version does Samza 0.9.0 use?
>
> Guozhang
>
> On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang <yanfang...@gmail.com> wrote:
>
>> Not sure about the Kafka side. From the Samza side, from your
>> description ( "does
>> not exit nor does it make any progress" ), I think the code is stuck in
>> producer.close
>> <
>> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143
>> >,
>> otherwise, it will throw SamzaException to quit the job. So maybe some
>> Kafka experts in this mailing list or Kafka mailing list can help
>>
>> Fang, Yan
>> yanfang...@gmail.com
>>
>> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover <roger.hoo...@gmail.com>
>> wrote:
>>
>> > At error level logging, this was the only entry in the Samza log:
>> >
>> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
>> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2] offset[9129395]
>> > Unable to send message from TaskName-Partition 1 to system kafka
>> >
>> > Here is the log from the Kafka broker that was shutdown.
>> >
>> > http://pastebin.com/afgmLyNF
>> >
>> > Thanks,
>> >
>> > Roger
>> >
>> >
>> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan <nickpa...@gmail.com> wrote:
>> >
>> > > Roger, could you paste the full log from Samza container? If you can
>> > figure
>> > > out which Kafka broker the message was sent to, it would be helpful
>> if we
>> > > get the log from the broker as well.
>> > >
>> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover <roger.hoo...@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I need some help figuring out what's going on.
>> > > >
>> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All the topics
>> have
>> > > > replication factor of 2.
>> > > >
>> > > > I'm bouncing the Kafka broker using SIGTERM (with
>> > > > controlled.shutdown.enable=true).  I see the Samza job log this
>> message
>> > > and
>> > > > then hang (does not exit nor does it make any progress).
>> > > >
>> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
>> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send message from
>> > > > TaskName-Partition 1 to system kafka
>> > > >
>> > > > The Kafka consumer (Druid Real-Time node) on the other side then
>> barfs
>> > on
>> > > > the message:
>> > > >
>> > > > Exception in thread "chief-svc-perf"
>> > > kafka.message.InvalidMessageException:
>> > > > Message is corrupt (stored crc = 1792882425, computed crc =
>> 3898271689)
>> > > > at kafka.message.Message.ensureValid(Message.scala:166)
>> > > > at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
>> > > > at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>> > > > at
>> > >
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>> > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106)
>> > > > at
>> > > >
>> > > >
>> > >
>> >
>> io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234)
>> > > >
>> > > > My questions are:
>> > > > 1) What is the right way to bounce a Kafka broker?
>> > > > 2) Is this a bug in Samza that the job hangs after producer request
>> > > fails?
>> > > > Has anyone seen this?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Roger
>> > > >
>> > >
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to