[ 
https://issues.apache.org/jira/browse/SAMZA-1728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494577#comment-16494577
 ] 

ASF GitHub Bot commented on SAMZA-1728:
---------------------------------------

Github user asfgit closed the pull request at:

    https://github.com/apache/samza/pull/533


> BootstrappingChooser: Call checkOffset only for a lagging partition while 
> choosing.
> -----------------------------------------------------------------------------------
>
>                 Key: SAMZA-1728
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1728
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Aditya
>            Assignee: Aditya
>            Priority: Major
>
> We seem to be calling checkOffset even for the ssps that have finished 
> bootstrapping, resulting in decrementing systemStreamLagCounts but not 
> laggingSystemStreamPartitions as that ssp has already been removed from the 
> set. This results in the systemStream removed from systemStreamLagCounts 
> while there are still few lagging ssps for that system stream.
> if (comparatorResult != null && comparatorResult.intValue() >= 0) {
>   laggingSystemStreamPartitions -= systemStreamPartition
>   systemStreamLagCounts += systemStream -> 
> (systemStreamLagCounts(systemStream) - 1)
>   if (systemStreamLagCounts(systemStream) == 0) {
>     // If the lag count is 0, then no partition for this stream is lagging
>     // (the stream has been fully caught up).
>     systemStreamLagCounts -= systemStream
>   }
>  
> This results in the following exception:
> java.util.NoSuchElementException: key not found: SystemStream 
> [system=brooklin-espresso, stream=SampleBrooklinFunctionsIdentityProfileDS]
>  at scala.collection.MapLike$class.default(MapLike.scala:228)
>  at scala.collection.AbstractMap.default(Map.scala:58)
>  at scala.collection.MapLike$class.apply(MapLike.scala:141)
>  at scala.collection.AbstractMap.apply(Map.scala:58)
>  at 
> org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:281)
>  at 
> org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
>  at 
> org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
>  at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
>  at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
>  at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
>  at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:753)
>  at 
> org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
>  at 
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to