[ 
https://issues.apache.org/jira/browse/SAMZA-233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini reassigned SAMZA-233:
-------------------------------------

    Assignee: Chris Riccomini

> BrokerProxies are never started when created during abdication
> --------------------------------------------------------------
>
>                 Key: SAMZA-233
>                 URL: https://issues.apache.org/jira/browse/SAMZA-233
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>
> We have seen an issue lately where, after some time, a container will stop 
> receiving messages from some partitions.
> After examining the container, it appears that this is triggered by an 
> abdication for a BrokerProxy that was not previously instantiated during the 
> KafkaSystemConsumers.start() method. Here's what we see:
> {noformat}
> 18:12:01,417  INFO KafkaSystemConsumer:128 - Abdicating for [MY-TOPIC,105]
> 18:12:01,418  INFO VerifiableProperties:68 - Verifying properties
> 18:12:01,419  INFO VerifiableProperties:68 - Property client.id is overridden 
> to samza_consumer-job-thingy-i001-1395955794446-1
> 18:12:01,419  INFO VerifiableProperties:68 - Property metadata.broker.list is 
> overridden to localhost:10251
> 18:12:01,419  INFO VerifiableProperties:68 - Property request.timeout.ms is 
> overridden to 6000
> 18:12:01,420  INFO ClientUtils$:68 - Fetching metadata from broker 
> id:0,host:localhost,port:10251 with correlation id 0 for 1 topic(s) 
> Set(MY-TOPIC)
> 18:12:01,421  INFO SyncProducer:68 - Connected to localhost:10251 for 
> producing
> 18:12:01,436  INFO SyncProducer:68 - Disconnecting from localhost:10251
> 18:12:01,437  INFO BrokerProxy:128 - Creating new SimpleConsumer for host 
> localhost:10251 for system kafka
> 18:12:01,442  INFO GetOffset:128 - Validating offset 2872503 for topic and 
> partition [MY-TOPIC,105]
> 18:12:01,456  INFO GetOffset:128 - Able to successfully read from offset 
> 2872503 for topic and partition [MY-TOPIC,105]. Using it to instantiate 
> consumer.
> {noformat}
> Notice that this log line never appears from BrokerProxy:
> {code}
>   def start {
>     info("Starting " + toString)
>     ...
>   }
> {code}
> Digging in a bit, KafkaSystemConsumer.refreshBrokers can create a new 
> BrokerProxy that wasn't created in the KafkaSystemConsumer.start() method in 
> cases where a partition was moved to a broker that it hasn't yet created a 
> proxy for.
> {code}
>           brokerOption match {
>             case Some(broker) =>
>               def createBrokerProxy = new BrokerProxy(broker.host, 
> broker.port, systemName, clientId, metrics, sink, timeout, bufferSize, 
> fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
>               brokerProxies
>                 .getOrElseUpdate((broker.host, broker.port), 
> createBrokerProxy)
>                 .addTopicPartition(head, Option(nextOffset))
>             case None => warn("No such topic-partition: %s, dropping." format 
> head)
> {code}
> But it never starts the thread.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to