[
https://issues.apache.org/jira/browse/SAMZA-1728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488519#comment-16488519
]
ASF GitHub Bot commented on SAMZA-1728:
---------------------------------------
GitHub user atoomula opened a pull request:
https://github.com/apache/samza/pull/533
SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging
partition while choosing.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/atoomula/samza chooser
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/samza/pull/533.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #533
----
commit ac1d1c813b01ba3d57d2b1cb1aa7687766aed225
Author: Aditya Toomula <atoomula@...>
Date: 2018-05-24T06:45:45Z
SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging
partition while choosing.
----
> BootstrappingChooser: Call checkOffset only for a lagging partition while
> choosing.
> -----------------------------------------------------------------------------------
>
> Key: SAMZA-1728
> URL: https://issues.apache.org/jira/browse/SAMZA-1728
> Project: Samza
> Issue Type: Bug
> Reporter: Aditya
> Assignee: Aditya
> Priority: Major
>
> We seem to be calling checkOffset even for the ssps that have finished
> bootstrapping, resulting in decrementing systemStreamLagCounts but not
> laggingSystemStreamPartitions as that ssp has already been removed from the
> set. This results in the systemStream removed from systemStreamLagCounts
> while there are still few lagging ssps for that system stream.
> if (comparatorResult != null && comparatorResult.intValue() >= 0) {
> laggingSystemStreamPartitions -= systemStreamPartition
> systemStreamLagCounts += systemStream ->
> (systemStreamLagCounts(systemStream) - 1)
> if (systemStreamLagCounts(systemStream) == 0) {
> // If the lag count is 0, then no partition for this stream is lagging
> // (the stream has been fully caught up).
> systemStreamLagCounts -= systemStream
> }
>
> This results in the following exception:
> java.util.NoSuchElementException: key not found: SystemStream
> [system=brooklin-espresso, stream=SampleBrooklinFunctionsIdentityProfileDS]
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.MapLike$class.apply(MapLike.scala:141)
> at scala.collection.AbstractMap.apply(Map.scala:58)
> at
> org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:281)
> at
> org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
> at
> org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
> at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
> at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
> at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:753)
> at
> org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
> at
> org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)