[ 
https://issues.apache.org/jira/browse/SAMZA-1728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488519#comment-16488519
 ] 

ASF GitHub Bot commented on SAMZA-1728:
---------------------------------------

GitHub user atoomula opened a pull request:

    https://github.com/apache/samza/pull/533

    SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging 
partition while choosing.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/atoomula/samza chooser

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/samza/pull/533.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #533
    
----
commit ac1d1c813b01ba3d57d2b1cb1aa7687766aed225
Author: Aditya Toomula <atoomula@...>
Date:   2018-05-24T06:45:45Z

    SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging 
partition while choosing.

----


> BootstrappingChooser: Call checkOffset only for a lagging partition while 
> choosing.
> -----------------------------------------------------------------------------------
>
>                 Key: SAMZA-1728
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1728
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Aditya
>            Assignee: Aditya
>            Priority: Major
>
> We seem to be calling checkOffset even for the ssps that have finished 
> bootstrapping, resulting in decrementing systemStreamLagCounts but not 
> laggingSystemStreamPartitions as that ssp has already been removed from the 
> set. This results in the systemStream removed from systemStreamLagCounts 
> while there are still few lagging ssps for that system stream.
> if (comparatorResult != null && comparatorResult.intValue() >= 0) {
>   laggingSystemStreamPartitions -= systemStreamPartition
>   systemStreamLagCounts += systemStream -> 
> (systemStreamLagCounts(systemStream) - 1)
>   if (systemStreamLagCounts(systemStream) == 0) {
>     // If the lag count is 0, then no partition for this stream is lagging
>     // (the stream has been fully caught up).
>     systemStreamLagCounts -= systemStream
>   }
>  
> This results in the following exception:
> java.util.NoSuchElementException: key not found: SystemStream 
> [system=brooklin-espresso, stream=SampleBrooklinFunctionsIdentityProfileDS]
>  at scala.collection.MapLike$class.default(MapLike.scala:228)
>  at scala.collection.AbstractMap.default(Map.scala:58)
>  at scala.collection.MapLike$class.apply(MapLike.scala:141)
>  at scala.collection.AbstractMap.apply(Map.scala:58)
>  at 
> org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:281)
>  at 
> org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
>  at 
> org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
>  at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
>  at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
>  at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
>  at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:753)
>  at 
> org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
>  at 
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to