Updated Branches: refs/heads/master 2ec5eb8d0 -> 4bf3f5eb0
SAMZA-80; fix issue with scala 2.8's double linked list in cached store that was leading to an npe. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4bf3f5eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4bf3f5eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4bf3f5eb Branch: refs/heads/master Commit: 4bf3f5eb0d2fbbfebc2fdeb6f38bd9b55de61597 Parents: 2ec5eb8 Author: Chris Riccomini <[email protected]> Authored: Tue Dec 3 09:37:23 2013 -0800 Committer: Chris Riccomini <[email protected]> Committed: Tue Dec 3 09:37:23 2013 -0800 ---------------------------------------------------------------------- .../apache/samza/metrics/MetricsHelper.scala | 5 + .../apache/samza/storage/kv/CachedStore.scala | 20 +++- .../samza/storage/kv/TestKeyValueStores.scala | 115 ++++++++++++++++++- 3 files changed, 132 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4bf3f5eb/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala index b412e46..8043f37 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala @@ -37,6 +37,11 @@ trait MetricsHelper { registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value)) } + /** + * Specify a dynamic gauge that always returns the latest value when polled. + * The value closure must be thread safe, since metrics reporters may access + * it from another thread. + */ def newGauge[T](name: String, value: () => T) = { registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value()) { override def getValue = value() http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4bf3f5eb/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index 6685f85..429f51a 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -49,7 +49,10 @@ class CachedStore[K, V]( val metrics: CachedStoreMetrics = new CachedStoreMetrics) extends KeyValueStore[K, V] with Logging { /** the number of items in the dirty list */ - private var dirtyCount = 0 + @volatile private var dirtyCount = 0 + + /** the number of items currently in the cache */ + @volatile private var cacheCount = 0 /** the list of items to be written out on flush from newest to oldest */ private var dirty = new mutable.DoubleLinkedList[K]() @@ -68,8 +71,12 @@ class CachedStore[K, V]( } } + // Use counters here, rather than directly accessing variables using .size + // since metrics can be accessed in other threads, and cache.size is not + // thread safe since we're using a LinkedHashMap, and dirty.size is slow + // since it requires a full traversal of the linked list. metrics.setDirtyCount(() => dirtyCount) - metrics.setCacheSize(() => cache.size) + metrics.setCacheSize(() => cacheCount) def get(key: K) = { metrics.gets.inc @@ -81,6 +88,7 @@ class CachedStore[K, V]( } else { val v = store.get(key) cache.put(key, new CacheEntry(v, null)) + cacheCount = cache.size v } } @@ -114,11 +122,17 @@ class CachedStore[K, V]( found.dirty.remove } } - this.dirty = new mutable.DoubleLinkedList(key, this.dirty) + + // We have to manually manage the prev value because Scala 2.8 is totally + // broken. See SAMZA-80 for details. + val oldDirtyList = this.dirty + this.dirty = new mutable.DoubleLinkedList(key, oldDirtyList) + oldDirtyList.prev = this.dirty // add the key to the cache (but don't allocate a new cache entry if we already have one) if (found == null) { cache.put(key, new CacheEntry(value, this.dirty)) + cacheCount = cache.size } else { found.value = value found.dirty = this.dirty http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4bf3f5eb/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala index 03a189e..2e5f6a3 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -28,10 +28,7 @@ import scala.collection.JavaConversions._ import org.apache.samza.serializers.IntegerSerde import org.iq80.leveldb.Options import org.junit.After -import org.junit.Assert.assertEquals -import org.junit.Assert.assertFalse -import org.junit.Assert.assertNull -import org.junit.Assert.assertTrue +import org.junit.Assert._ import org.junit.Before import org.junit.Test import org.junit.runner.RunWith @@ -152,6 +149,114 @@ class TestKeyValueStores(cache: Boolean) { serializedStore.putAll(List(new Entry[java.lang.Integer, java.lang.Integer](0, null))) } + /** + * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList + * implementation. The issue is that it doesn't work. More specifically, + * creating a DoubleLinkedList from an existing list does not update the + * "prev" field of the existing list's head to point to the new head. As a + * result, in Scala 2.8.1, every DoulbeLinkedList node's prev field is null. + * Samza gets around this by manually updating the field itself. See SAMZA-80 + * for details. + * + * This issue is exposed in Samza's KV cache implementation, which uses + * DoubleLinkedList, so all comments in this method are discussing the cached + * implementation, but the test is still useful as a sanity check for + * non-cached stores. + */ + @Test + def testBrokenScalaDoubleLinkedList() { + val something = b("") + val keys = letters + .map(b(_)) + .toArray + + // Load the cache to capacity. + letters + .slice(0, TestKeyValueStores.CacheSize) + .map(b(_)) + .foreach(store.put(_, something)) + + // Now keep everything in the cache, but with an empty dirty list. + store.flush + + // Dirty list is now empty, and every CacheEntry has dirty=null. + + // Corrupt the dirty list by creating two dirty lists that toggle back and + // forth depending on whether the last dirty write was to 1 or 0. The trick + // here is that every element in the cache is treated as the "head" of the + // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus, + // You can end up with multiple nodes each having their own version of the + // dirty list with different elements in them. + store.put(keys(1), something) + store.put(keys(0), something) + store.put(keys(1), something) + store.flush + // The dirty list is now empty, but 0's dirty field actually has 0 and 1. + store.put(keys(0), something) + // The dirty list now has 0 and 1, but 1's dirty field is null in the + // cache because it was just flushed. + + // Get rid of 1 from the cache by reading every other element, and then + // putting one new element. + letters + .slice(2, TestKeyValueStores.CacheSize) + .map(b(_)) + .foreach(store.get(_)) + store.put(keys(TestKeyValueStores.CacheSize), something) + + // Now try and trigger an NPE since the dirty list has an element (1) + // that's no longer in the cache. + store.flush + } + + /** + * A little test that tries to simulate a few common patterns: + * read-modify-write, and do-some-stuff-then-delete (windowing). + */ + @Test + def testRandomReadWriteRemove() { + // Make test deterministic by seeding the random number generator. + val rand = new Random(12345) + val keys = letters + .map(b(_)) + .toArray + + // Map from letter to key byte array used for letter, and expected value. + // We have to go through some acrobatics here since Java's byte array uses + // object identity for .equals. Two byte arrays can have identical byte + // elements, but not be equal. + var expected = Map[String, (Array[Byte], String)]() + + (0 until 100).foreach(loop => { + (0 until 30).foreach(i => { + val idx = rand.nextInt(keys.length) + val randomValue = letters(rand.nextInt(keys.length)) + val key = keys(idx) + val currentVal = store.get(key) + store.put(key, b(randomValue)) + expected += letters(idx) -> (key, randomValue) + }) + + for ((k, v) <- expected) { + val bytes = store.get(v._1) + assertNotNull(bytes) + assertEquals(v._2, new String(bytes, "UTF-8")) + } + + val iterator = store.all + + while (iterator.hasNext) { + val key = iterator.next.getKey + store.delete(key) + expected -= new String(key, "UTF-8") + } + + iterator.close + + assertEquals(0, expected.size) + }) + } + def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) { for (v <- vals) { assertTrue(iter.hasNext) @@ -178,7 +283,7 @@ class TestKeyValueStores(cache: Boolean) { object TestKeyValueStores { val CacheSize = 10 - val BatchSize = 2 + val BatchSize = 5 @Parameters def parameters: java.util.Collection[Array[java.lang.Boolean]] = Arrays.asList(Array(java.lang.Boolean.TRUE), Array(java.lang.Boolean.FALSE)) }
