[ 
https://issues.apache.org/jira/browse/SAMZA-80?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-80:
---------------------------------

    Attachment: SAMZA-80.0.patch

Attaching patch.

1. Adding a warning to the MetricsHelper class to warn that metrics can be 
accessed from other threads, so dynamic gauges must be thread safe.
2. Introduced a cacheCount variable to make CachedStore thread safe, since 
MetricsReporters would end up calling cache.size, which isn't thread safe for 
LinkedHashMap.
3. Manually manage Scala's DoubleLinkedList.prev field, since it appears to not 
properly handle it.
4. Add test to validate that manually manage the prev field fixes the problem.
5. Add a more generic test to simulate some common behavior for StreamTasks 
that use storage.

For (4), the weird issue that comes from having prev always be null is that we 
were ending up with multiple versions of the dirty list floating around in the 
cache. This would result in some versions of the dirty list pointing to 
elements of the cache which had been flushed out (after a flush being triggered 
by a different version of the dirty list).

RB available at:

  https://reviews.apache.org/r/15900/diff/raw/

> KV cache store fails with NPE
> -----------------------------
>
>                 Key: SAMZA-80
>                 URL: https://issues.apache.org/jira/browse/SAMZA-80
>             Project: Samza
>          Issue Type: Bug
>          Components: kv
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-80.0.patch
>
>
> I came across a situation in which a StreamTask with a store will trigger an 
> NPE in the caching class.
> {noformat}
> java.lang.NullPointerException
>   at 
> org.apache.samza.storage.kv.CachedStore$$anonfun$flush$2.apply(CachedStore.scala:144)
>   at 
> org.apache.samza.storage.kv.CachedStore$$anonfun$flush$2.apply(CachedStore.scala:142)
>   at 
> scala.collection.mutable.LinkedListLike$class.foreach(LinkedListLike.scala:111)
>   at 
> scala.collection.mutable.DoubleLinkedList.foreach(DoubleLinkedList.scala:41)
>   at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:142)
>   at 
> org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:65)
>   at java.util.LinkedHashMap.addEntry(LinkedHashMap.java:410)
>   at java.util.HashMap.put(HashMap.java:385)
>   at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:83)
>   at 
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:48)
>   at com.linkedin.samza.example.StatefulTask.process(StatefulTask.java:51)
>   at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:168)
>   at 
> org.apache.samza.container.SamzaContainer.process(SamzaContainer.scala:563)
>   at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:455)
>   at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to