Yi Pan (Data Infrastructure) created SAMZA-789:
--------------------------------------------------
Summary: CoordinatorStreamSystemConsumer should not register the
coordinator stream partition after the SystemConsumer has already started
Key: SAMZA-789
URL: https://issues.apache.org/jira/browse/SAMZA-789
Project: Samza
Issue Type: Bug
Affects Versions: 0.10.0
Reporter: Yi Pan (Data Infrastructure)
Assignee: Yi Pan (Data Infrastructure)
Fix For: 0.10.0
The following bug has been observed in 0.10:
1) when coordinator stream is enabled in 0.10, the OffsetManager starts
CheckpointManager internally and register/start the enclosed SystemConsumer
first
2) after the container is started, the LocalityManager starts and register the
enclosed SystemConsumer again, after it has already started.
This creates the problem in KafkaSystemConsumer and generates the following
warning logs in a forever loop in KafkaSystemConsumer.refreshBroker() function:
{code}
2015-10-06 17:10:24 BrokerProxy [DEBUG] Adding new topic and partition
[__samza_coordinator_acg-test_i001,0] to queue for
lca1-app0908.corp.linkedin.com
2015-10-06 17:10:24 KafkaSystemConsumer [WARN] While refreshing brokers for
[__samza_coordinator_acg-test_i001,0]: org.apache.samza.SamzaException: Already
consuming TopicPartition [__samza_coordinator_acg-test_i001,0]. Retrying.
2015-10-06 17:10:24 KafkaSystemConsumer [DEBUG] Exception detail:
org.apache.samza.SamzaException: Already consuming TopicPartition
[__samza_coordinator_acg-test_i001,0]
at org.apache.samza.system.kafka.Toss$class.toss(Toss.scala:27)
at org.apache.samza.system.kafka.BrokerProxy.toss(BrokerProxy.scala:51)
at
org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(BrokerProxy.scala:96)
at
org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:165)
at
org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:178)
at
org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$refreshBrokers$2.apply(KafkaSystemConsumer.scala:144)
at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
at
org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:143)
at
org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.refreshDropped(KafkaSystemConsumer.scala:195)
at
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:142)
at
org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$run$1.apply(BrokerProxy.scala:134)
at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
at
org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:133)
at java.lang.Thread.run(Thread.java:745)
{code}
The quick fix here is to avoid registering the same coordinate stream system
partition after the CoordinatorStreamSystemConsumer has already started.
Long term fix is to manage the life-cycle states of the SystemConsumers more
rigorously in a state machine model that only allows certain state transitions:
e.g. only from init -> registered -> started -> stopped, etc.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)