[
https://issues.apache.org/jira/browse/SAMZA-1100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886453#comment-15886453
]
ASF GitHub Bot commented on SAMZA-1100:
---------------------------------------
GitHub user shanthoosh opened a pull request:
https://github.com/apache/samza/pull/68
SAMZA-1100: Exception when using a stream as both bootstrap and broadcast.
When a task input stream is used as both broadcast and bootstrap stream in
a samza job, Bootstrappingchooser marks the stream as bootstrapped when a
single task finishes consuming all the SystemStreamPartitions(This happens when
all the starting offset for each partition in the input stream is of type
upcoming). This patch fixes this, by marking a stream as bootstrapped, only
when all the systemStreamPartitions in a input stream is consumed by all the
expected tasks.
More details here : https://issues.apache.org/jira/browse/SAMZA-1100
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/shanthoosh/samza master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/samza/pull/68.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #68
----
commit 3a0c0c9315832d2a5d1af82ac35be2c10b5f37ae
Author: Shanthoosh Venkataraman <[email protected]>
Date: 2017-02-27T18:45:42Z
SAMZA-1100: Exception when using a stream as both bootstrap and broadcast.
----
> Exception when using a stream as both bootstrap and broadcast
> -------------------------------------------------------------
>
> Key: SAMZA-1100
> URL: https://issues.apache.org/jira/browse/SAMZA-1100
> Project: Samza
> Issue Type: Bug
> Reporter: Xinyu Liu
> Assignee: Shanthoosh Venkataraman
>
> The following exception is thrown after setting test-system.test-topic to be
> both broadcast and bootstrap stream:
> 017-02-22 20:20:45 SamzaContainer [ERROR] Caught exception in process loop.
> java.util.NoSuchElementException: key not found: SystemStream
> [system=test-system, stream=test-topic]
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.MapLike$class.apply(MapLike.scala:141)
> at scala.collection.AbstractMap.apply(Map.scala:58)
> at
> org.apache.samza.system.chooser.BootstrappingChooser.checkOffset(BootstrappingChooser.scala:249)
> at
> org.apache.samza.system.chooser.BootstrappingChooser.register(BootstrappingChooser.scala:127)
> at
> org.apache.samza.system.chooser.DefaultChooser.register(DefaultChooser.scala:289)
> at
> org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:209)
> at
> org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$2.apply(TaskInstance.scala:134)
> at
> org.apache.samza.container.TaskInstance$$anonfun$registerConsumers$2.apply(TaskInstance.scala:131)
> at
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance.scala:131)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:823)
> at
> org.apache.samza.container.SamzaContainer$$anonfun$startConsumers$2.apply(SamzaContainer.scala:823)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> at
> org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.scala:823)
> at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:687)
> at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:109)
> at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:83)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> 2017-02-22 20:20:45 SamzaContainer [INFO] Shutting down.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)