[ 
https://issues.apache.org/jira/browse/SAMZA-789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yi Pan (Data Infrastructure) updated SAMZA-789:
-----------------------------------------------
    Attachment: SAMZA-789-0.patch

RB is online as well: https://reviews.apache.org/r/39072/

> CoordinatorStreamSystemConsumer should not register the coordinator stream 
> partition after the SystemConsumer has already started
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-789
>                 URL: https://issues.apache.org/jira/browse/SAMZA-789
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.10.0
>            Reporter: Yi Pan (Data Infrastructure)
>            Assignee: Yi Pan (Data Infrastructure)
>             Fix For: 0.10.0
>
>         Attachments: SAMZA-789-0.patch
>
>
> The following bug has been observed in 0.10:
> 1) when coordinator stream is enabled in 0.10, the OffsetManager starts 
> CheckpointManager internally and register/start the enclosed SystemConsumer 
> first
> 2) after the container is started, the LocalityManager starts and register 
> the enclosed SystemConsumer again, after it has already started.
> This creates the problem in KafkaSystemConsumer and generates the following 
> warning logs in a forever loop in KafkaSystemConsumer.refreshBroker() 
> function:
> {code}
> 2015-10-06 17:10:24 BrokerProxy [DEBUG] Adding new topic and partition 
> [__samza_coordinator_acg-test_i001,0] to queue for 
> lca1-app0908.corp.linkedin.com
> 2015-10-06 17:10:24 KafkaSystemConsumer [WARN] While refreshing brokers for 
> [__samza_coordinator_acg-test_i001,0]: org.apache.samza.SamzaException: 
> Already consuming TopicPartition [__samza_coordinator_acg-test_i001,0]. 
> Retrying.
> 2015-10-06 17:10:24 KafkaSystemConsumer [DEBUG] Exception detail:
> org.apache.samza.SamzaException: Already consuming TopicPartition 
> [__samza_coordinator_acg-test_i001,0]
>       at org.apache.samza.system.kafka.Toss$class.toss(Toss.scala:27)
>       at org.apache.samza.system.kafka.BrokerProxy.toss(BrokerProxy.scala:51)
>       at 
> org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(BrokerProxy.scala:96)
>       at 
> org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:165)
>       at 
> org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:178)
>       at 
> org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:144)
>       at 
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>       at 
> org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:143)
>       at 
> org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.refreshDropped(KafkaSystemConsumer.scala:195)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:142)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
>       at 
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>       at 
> org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> The quick fix here is to avoid registering the same coordinate stream system 
> partition after the CoordinatorStreamSystemConsumer has already started.
> Long term fix is to manage the life-cycle states of the SystemConsumers more 
> rigorously in a state machine model that only allows certain state 
> transitions: e.g. only from init -> registered -> started -> stopped, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to