Tommy Becker created SAMZA-567:
----------------------------------
Summary: Can't interact with KV store from InitiableTask.init()
Key: SAMZA-567
URL: https://issues.apache.org/jira/browse/SAMZA-567
Project: Samza
Issue Type: Bug
Components: container
Affects Versions: 0.8.0
Reporter: Tommy Becker
Fix For: 0.9.0
Attempting to interact with the KeyValueStore from InitiableTask.init() results
in a rather obscure exception:
java.util.NoSuchElementException: key not found: TaskName-Partition 3
at scala.collection.MapLike$class.default(MapLike.scala:228)
~[scala-library-2.10.1.jar:na]
at scala.collection.AbstractMap.default(Map.scala:58)
~[scala-library-2.10.1.jar:na]
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
~[scala-library-2.10.1.jar:na]
at
org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71)
~[samza-core_2.10-0.8.0.jar:na]
at
org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
~[samza-core_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72)
~[samza-kv_2.10-0.8.0.jar:na]
at
org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159)
~[samza-kv_2.10-0.8.0.jar:na]
at
org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
~[samza-kv_2.10-0.8.0.jar:na]
at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
~[na:1.8.0_25]
at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91)
~[samza-kv_2.10-0.8.0.jar:na]
at
org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
~[samza-kv_2.10-0.8.0.jar:na]
at
org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
~[samza-kv_2.10-0.8.0.jar:na]
...
After some investigation I see that it's actually not safe to do anything that
is going to potentially produce messages from init(), because startTask is
called before startProducers in SamzaContainer.run. Interaction with the KV
store results in writes to the changelog, resulting in the above exception.
Conceptually, it seems like the producers should be initialized first to
prevent this, but I have no idea what the side-effects of doing that would be.
Minimally, I'd like to see this behavior documented and a more obvious failure
such as an IllegalStateException.
Discussion that precipitated this issue:
http://mail-archives.apache.org/mod_mbox/samza-dev/201502.mbox/%[email protected]%3e
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)