[
https://issues.apache.org/jira/browse/SAMZA-567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14354475#comment-14354475
]
Yan Fang commented on SAMZA-567:
--------------------------------
Sure. Changed to you.
> Can't interact with KV store from InitiableTask.init()
> ------------------------------------------------------
>
> Key: SAMZA-567
> URL: https://issues.apache.org/jira/browse/SAMZA-567
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.8.0
> Reporter: Tommy Becker
> Assignee: Yan Fang
> Fix For: 0.9.0
>
>
> Attempting to interact with the KeyValueStore from InitiableTask.init()
> results in a rather obscure exception:
> java.util.NoSuchElementException: key not found: TaskName-Partition 3
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.AbstractMap.default(Map.scala:58)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> ~[scala-library-2.10.1.jar:na]
> at
> org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71)
> ~[samza-core_2.10-0.8.0.jar:na]
> at
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
> ~[samza-core_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
> ~[na:1.8.0_25]
> at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
> at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
> at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
> ~[samza-kv_2.10-0.8.0.jar:na]
> ...
> After some investigation I see that it's actually not safe to do anything
> that is going to potentially produce messages from init(), because startTask
> is called before startProducers in SamzaContainer.run. Interaction with the
> KV store results in writes to the changelog, resulting in the above
> exception. Conceptually, it seems like the producers should be initialized
> first to prevent this, but I have no idea what the side-effects of doing that
> would be. Minimally, I'd like to see this behavior documented and a more
> obvious failure such as an IllegalStateException.
> Discussion that precipitated this issue:
> http://mail-archives.apache.org/mod_mbox/samza-dev/201502.mbox/%[email protected]%3e
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)