[
https://issues.apache.org/jira/browse/SAMZA-1728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494577#comment-16494577
]
ASF GitHub Bot commented on SAMZA-1728:
---------------------------------------
Github user asfgit closed the pull request at:
https://github.com/apache/samza/pull/533
> BootstrappingChooser: Call checkOffset only for a lagging partition while
> choosing.
> -----------------------------------------------------------------------------------
>
> Key: SAMZA-1728
> URL: https://issues.apache.org/jira/browse/SAMZA-1728
> Project: Samza
> Issue Type: Bug
> Reporter: Aditya
> Assignee: Aditya
> Priority: Major
>
> We seem to be calling checkOffset even for the ssps that have finished
> bootstrapping, resulting in decrementing systemStreamLagCounts but not
> laggingSystemStreamPartitions as that ssp has already been removed from the
> set. This results in the systemStream removed from systemStreamLagCounts
> while there are still few lagging ssps for that system stream.
> if (comparatorResult != null && comparatorResult.intValue() >= 0) {
> laggingSystemStreamPartitions -= systemStreamPartition
> systemStreamLagCounts += systemStream ->
> (systemStreamLagCounts(systemStream) - 1)
> if (systemStreamLagCounts(systemStream) == 0) {
> // If the lag count is 0, then no partition for this stream is lagging
> // (the stream has been fully caught up).
> systemStreamLagCounts -= systemStream
> }
>
> This results in the following exception:
> java.util.NoSuchElementException: key not found: SystemStream
> [system=brooklin-espresso, stream=SampleBrooklinFunctionsIdentityProfileDS]
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.MapLike$class.apply(MapLike.scala:141)
> at scala.collection.AbstractMap.apply(Map.scala:58)
> at
> org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:281)
> at
> org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
> at
> org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
> at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
> at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
> at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:753)
> at
> org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
> at
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)