Aditya created SAMZA-1728:
-----------------------------
Summary: BootstrappingChooser: Call checkOffset only for a lagging
partition while choosing.
Key: SAMZA-1728
URL: https://issues.apache.org/jira/browse/SAMZA-1728
Project: Samza
Issue Type: Bug
Reporter: Aditya
Assignee: Aditya
We seem to be calling checkOffset even for the ssps that have finished
bootstrapping, resulting in decrementing systemStreamLagCounts but not
laggingSystemStreamPartitions as that ssp has already been removed from the
set. This results in the systemStream removed from systemStreamLagCounts while
there are still few lagging ssps for that system stream.
if (comparatorResult != null && comparatorResult.intValue() >= 0) {
laggingSystemStreamPartitions -= systemStreamPartition
systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream)
- 1)
if (systemStreamLagCounts(systemStream) == 0) {
// If the lag count is 0, then no partition for this stream is lagging
// (the stream has been fully caught up).
systemStreamLagCounts -= systemStream
}
This results in the following exception:
java.util.NoSuchElementException: key not found: SystemStream
[system=brooklin-espresso, stream=SampleBrooklinFunctionsIdentityProfileDS]
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at
org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:281)
at
org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
at
org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:753)
at
org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
at
org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)