[
https://issues.apache.org/jira/browse/SAMZA-80?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-80:
---------------------------------
Attachment: SAMZA-80.0.patch
Attaching patch.
1. Adding a warning to the MetricsHelper class to warn that metrics can be
accessed from other threads, so dynamic gauges must be thread safe.
2. Introduced a cacheCount variable to make CachedStore thread safe, since
MetricsReporters would end up calling cache.size, which isn't thread safe for
LinkedHashMap.
3. Manually manage Scala's DoubleLinkedList.prev field, since it appears to not
properly handle it.
4. Add test to validate that manually manage the prev field fixes the problem.
5. Add a more generic test to simulate some common behavior for StreamTasks
that use storage.
For (4), the weird issue that comes from having prev always be null is that we
were ending up with multiple versions of the dirty list floating around in the
cache. This would result in some versions of the dirty list pointing to
elements of the cache which had been flushed out (after a flush being triggered
by a different version of the dirty list).
RB available at:
https://reviews.apache.org/r/15900/diff/raw/
> KV cache store fails with NPE
> -----------------------------
>
> Key: SAMZA-80
> URL: https://issues.apache.org/jira/browse/SAMZA-80
> Project: Samza
> Issue Type: Bug
> Components: kv
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.7.0
>
> Attachments: SAMZA-80.0.patch
>
>
> I came across a situation in which a StreamTask with a store will trigger an
> NPE in the caching class.
> {noformat}
> java.lang.NullPointerException
> at
> org.apache.samza.storage.kv.CachedStore$$anonfun$flush$2.apply(CachedStore.scala:144)
> at
> org.apache.samza.storage.kv.CachedStore$$anonfun$flush$2.apply(CachedStore.scala:142)
> at
> scala.collection.mutable.LinkedListLike$class.foreach(LinkedListLike.scala:111)
> at
> scala.collection.mutable.DoubleLinkedList.foreach(DoubleLinkedList.scala:41)
> at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:142)
> at
> org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:65)
> at java.util.LinkedHashMap.addEntry(LinkedHashMap.java:410)
> at java.util.HashMap.put(HashMap.java:385)
> at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:83)
> at
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:48)
> at com.linkedin.samza.example.StatefulTask.process(StatefulTask.java:51)
> at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:168)
> at
> org.apache.samza.container.SamzaContainer.process(SamzaContainer.scala:563)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:455)
> at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.1#6144)