[
https://issues.apache.org/jira/browse/SAMZA-233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-233:
----------------------------------
Attachment: SAMZA-233.0.patch
Attaching patch. RB at https://reviews.apache.org/r/20375/.
Changes are:
1. Make BrokerProxy.start check if its thread is already started, and not start
it twice.
2. Change KafkaSystemConsumer to always start broker proxies in refreshBrokers.
3. Remove the BrokerProxy.start calls from KafkaSystemConsumer.start, since
they're handled by refreshBrokers now.
4. Add a test to verify that abdication triggers a start call.
5. Make TopicMetadataStore mock-able for KafkaSystemConsumer.
I had to do some back-bending to get (4) done. Mostly had to extract logic from
refreshBrokers in order to remove Kafka calls, and avoid having to use Kafka's
Broker class, which is package-private.
> BrokerProxies are never started when created during abdication
> --------------------------------------------------------------
>
> Key: SAMZA-233
> URL: https://issues.apache.org/jira/browse/SAMZA-233
> Project: Samza
> Issue Type: Bug
> Components: kafka
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.7.0
>
> Attachments: SAMZA-233.0.patch
>
>
> We have seen an issue lately where, after some time, a container will stop
> receiving messages from some partitions.
> After examining the container, it appears that this is triggered by an
> abdication for a BrokerProxy that was not previously instantiated during the
> KafkaSystemConsumers.start() method. Here's what we see:
> {noformat}
> 18:12:01,417 INFO KafkaSystemConsumer:128 - Abdicating for [MY-TOPIC,105]
> 18:12:01,418 INFO VerifiableProperties:68 - Verifying properties
> 18:12:01,419 INFO VerifiableProperties:68 - Property client.id is overridden
> to samza_consumer-job-thingy-i001-1395955794446-1
> 18:12:01,419 INFO VerifiableProperties:68 - Property metadata.broker.list is
> overridden to localhost:10251
> 18:12:01,419 INFO VerifiableProperties:68 - Property request.timeout.ms is
> overridden to 6000
> 18:12:01,420 INFO ClientUtils$:68 - Fetching metadata from broker
> id:0,host:localhost,port:10251 with correlation id 0 for 1 topic(s)
> Set(MY-TOPIC)
> 18:12:01,421 INFO SyncProducer:68 - Connected to localhost:10251 for
> producing
> 18:12:01,436 INFO SyncProducer:68 - Disconnecting from localhost:10251
> 18:12:01,437 INFO BrokerProxy:128 - Creating new SimpleConsumer for host
> localhost:10251 for system kafka
> 18:12:01,442 INFO GetOffset:128 - Validating offset 2872503 for topic and
> partition [MY-TOPIC,105]
> 18:12:01,456 INFO GetOffset:128 - Able to successfully read from offset
> 2872503 for topic and partition [MY-TOPIC,105]. Using it to instantiate
> consumer.
> {noformat}
> Notice that this log line never appears from BrokerProxy:
> {code}
> def start {
> info("Starting " + toString)
> ...
> }
> {code}
> Digging in a bit, KafkaSystemConsumer.refreshBrokers can create a new
> BrokerProxy that wasn't created in the KafkaSystemConsumer.start() method in
> cases where a partition was moved to a broker that it hasn't yet created a
> proxy for.
> {code}
> brokerOption match {
> case Some(broker) =>
> def createBrokerProxy = new BrokerProxy(broker.host,
> broker.port, systemName, clientId, metrics, sink, timeout, bufferSize,
> fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
> brokerProxies
> .getOrElseUpdate((broker.host, broker.port),
> createBrokerProxy)
> .addTopicPartition(head, Option(nextOffset))
> case None => warn("No such topic-partition: %s, dropping." format
> head)
> {code}
> But it never starts the thread.
--
This message was sent by Atlassian JIRA
(v6.2#6252)