[ 
https://issues.apache.org/jira/browse/SAMZA-567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367875#comment-14367875
 ] 

Naveen commented on SAMZA-567:
------------------------------

RB here: https://reviews.apache.org/r/32217/

> Can't interact with KV store from InitiableTask.init()
> ------------------------------------------------------
>
>                 Key: SAMZA-567
>                 URL: https://issues.apache.org/jira/browse/SAMZA-567
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Tommy Becker
>            Assignee: Naveen
>             Fix For: 0.9.0
>
>
> Attempting to interact with the KeyValueStore from InitiableTask.init() 
> results in a rather obscure exception:
> java.util.NoSuchElementException: key not found: TaskName-Partition 3
>       at scala.collection.MapLike$class.default(MapLike.scala:228) 
> ~[scala-library-2.10.1.jar:na]
>       at scala.collection.AbstractMap.default(Map.scala:58) 
> ~[scala-library-2.10.1.jar:na]
>       at scala.collection.mutable.HashMap.apply(HashMap.scala:64) 
> ~[scala-library-2.10.1.jar:na]
>       at 
> org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71) 
> ~[samza-core_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
>  ~[samza-core_2.10-0.8.0.jar:na]
>       at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) 
> ~[samza-kv_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
>  ~[samza-kv_2.10-0.8.0.jar:na]
>       at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159) 
> ~[samza-kv_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
>  ~[samza-kv_2.10-0.8.0.jar:na]
>       at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) 
> ~[na:1.8.0_25]
>       at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
>       at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
>       at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91) 
> ~[samza-kv_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
>  ~[samza-kv_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
>  ~[samza-kv_2.10-0.8.0.jar:na]
> ...
> After some investigation I see that it's actually not safe to do anything 
> that is going to potentially produce messages from init(),  because startTask 
> is called before startProducers in SamzaContainer.run.  Interaction with the 
> KV store results in writes to the changelog, resulting in the above 
> exception.  Conceptually, it seems like the producers should be initialized 
> first to prevent this, but I have no idea what the side-effects of doing that 
> would be.  Minimally, I'd like to see this behavior documented and a more 
> obvious failure such as an IllegalStateException.
> Discussion that precipitated this issue:
> http://mail-archives.apache.org/mod_mbox/samza-dev/201502.mbox/%[email protected]%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to