[ 
https://issues.apache.org/jira/browse/SAMZA-567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14323232#comment-14323232
 ] 

Chris Riccomini commented on SAMZA-567:
---------------------------------------

I took a quick look at this. I think we should just move the ordering of 
SamzaContainer to be:

{noformat}
      startMetrics
      startOffsetManager
      startStores
      startProducers
      startConsumers
      startTask
{noformat}

Similarly, we should move the shutdown to be:

{noformat}
      shutdownConsumers
      shutdownProducers
      shutdownTask
      shutdownStores
      shutdownOffsetManager
      shutdownMetrics
{noformat}

I think this should be safe to do. This will allow us to treat the init() 
method just like a process method, which makes a lot of sense. It's essentially 
just the first invocation to process, but doesn't include a message.

> Can't interact with KV store from InitiableTask.init()
> ------------------------------------------------------
>
>                 Key: SAMZA-567
>                 URL: https://issues.apache.org/jira/browse/SAMZA-567
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Tommy Becker
>             Fix For: 0.9.0
>
>
> Attempting to interact with the KeyValueStore from InitiableTask.init() 
> results in a rather obscure exception:
> java.util.NoSuchElementException: key not found: TaskName-Partition 3
>       at scala.collection.MapLike$class.default(MapLike.scala:228) 
> ~[scala-library-2.10.1.jar:na]
>       at scala.collection.AbstractMap.default(Map.scala:58) 
> ~[scala-library-2.10.1.jar:na]
>       at scala.collection.mutable.HashMap.apply(HashMap.scala:64) 
> ~[scala-library-2.10.1.jar:na]
>       at 
> org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71) 
> ~[samza-core_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
>  ~[samza-core_2.10-0.8.0.jar:na]
>       at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) 
> ~[samza-kv_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
>  ~[samza-kv_2.10-0.8.0.jar:na]
>       at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159) 
> ~[samza-kv_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
>  ~[samza-kv_2.10-0.8.0.jar:na]
>       at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) 
> ~[na:1.8.0_25]
>       at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
>       at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
>       at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91) 
> ~[samza-kv_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
>  ~[samza-kv_2.10-0.8.0.jar:na]
>       at 
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
>  ~[samza-kv_2.10-0.8.0.jar:na]
> ...
> After some investigation I see that it's actually not safe to do anything 
> that is going to potentially produce messages from init(),  because startTask 
> is called before startProducers in SamzaContainer.run.  Interaction with the 
> KV store results in writes to the changelog, resulting in the above 
> exception.  Conceptually, it seems like the producers should be initialized 
> first to prevent this, but I have no idea what the side-effects of doing that 
> would be.  Minimally, I'd like to see this behavior documented and a more 
> obvious failure such as an IllegalStateException.
> Discussion that precipitated this issue:
> http://mail-archives.apache.org/mod_mbox/samza-dev/201502.mbox/%[email protected]%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to