Updated Branches:
  refs/heads/master 2ec5eb8d0 -> 4bf3f5eb0

SAMZA-80; fix issue with scala 2.8's double linked list in cached store that 
was leading to an npe.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4bf3f5eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4bf3f5eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4bf3f5eb

Branch: refs/heads/master
Commit: 4bf3f5eb0d2fbbfebc2fdeb6f38bd9b55de61597
Parents: 2ec5eb8
Author: Chris Riccomini <[email protected]>
Authored: Tue Dec 3 09:37:23 2013 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Tue Dec 3 09:37:23 2013 -0800

----------------------------------------------------------------------
 .../apache/samza/metrics/MetricsHelper.scala    |   5 +
 .../apache/samza/storage/kv/CachedStore.scala   |  20 +++-
 .../samza/storage/kv/TestKeyValueStores.scala   | 115 ++++++++++++++++++-
 3 files changed, 132 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4bf3f5eb/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala 
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index b412e46..8043f37 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -37,6 +37,11 @@ trait MetricsHelper {
     registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value))
   }
 
+  /**
+   * Specify a dynamic gauge that always returns the latest value when polled. 
+   * The value closure must be thread safe, since metrics reporters may access 
+   * it from another thread.
+   */
   def newGauge[T](name: String, value: () => T) = {
     registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, 
value()) {
       override def getValue = value()

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4bf3f5eb/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 6685f85..429f51a 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -49,7 +49,10 @@ class CachedStore[K, V](
   val metrics: CachedStoreMetrics = new CachedStoreMetrics) extends 
KeyValueStore[K, V] with Logging {
 
   /** the number of items in the dirty list */
-  private var dirtyCount = 0
+  @volatile private var dirtyCount = 0
+
+  /** the number of items currently in the cache */
+  @volatile private var cacheCount = 0
 
   /** the list of items to be written out on flush from newest to oldest */
   private var dirty = new mutable.DoubleLinkedList[K]()
@@ -68,8 +71,12 @@ class CachedStore[K, V](
     }
   }
 
+  // Use counters here, rather than directly accessing variables using .size 
+  // since metrics can be accessed in other threads, and cache.size is not 
+  // thread safe since we're using a LinkedHashMap, and dirty.size is slow
+  // since it requires a full traversal of the linked list.
   metrics.setDirtyCount(() => dirtyCount)
-  metrics.setCacheSize(() => cache.size)
+  metrics.setCacheSize(() => cacheCount)
 
   def get(key: K) = {
     metrics.gets.inc
@@ -81,6 +88,7 @@ class CachedStore[K, V](
     } else {
       val v = store.get(key)
       cache.put(key, new CacheEntry(v, null))
+      cacheCount = cache.size
       v
     }
   }
@@ -114,11 +122,17 @@ class CachedStore[K, V](
         found.dirty.remove
       }
     }
-    this.dirty = new mutable.DoubleLinkedList(key, this.dirty)
+
+    // We have to manually manage the prev value because Scala 2.8 is totally 
+    // broken. See SAMZA-80 for details.
+    val oldDirtyList = this.dirty
+    this.dirty = new mutable.DoubleLinkedList(key, oldDirtyList)
+    oldDirtyList.prev = this.dirty
 
     // add the key to the cache (but don't allocate a new cache entry if we 
already have one)
     if (found == null) {
       cache.put(key, new CacheEntry(value, this.dirty))
+      cacheCount = cache.size
     } else {
       found.value = value
       found.dirty = this.dirty

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4bf3f5eb/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 03a189e..2e5f6a3 100644
--- 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -28,10 +28,7 @@ import scala.collection.JavaConversions._
 import org.apache.samza.serializers.IntegerSerde
 import org.iq80.leveldb.Options
 import org.junit.After
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertFalse
-import org.junit.Assert.assertNull
-import org.junit.Assert.assertTrue
+import org.junit.Assert._
 import org.junit.Before
 import org.junit.Test
 import org.junit.runner.RunWith
@@ -152,6 +149,114 @@ class TestKeyValueStores(cache: Boolean) {
     serializedStore.putAll(List(new Entry[java.lang.Integer, 
java.lang.Integer](0, null)))
   }
 
+  /**
+   * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
+   * implementation. The issue is that it doesn't work. More specifically,
+   * creating a DoubleLinkedList from an existing list does not update the
+   * "prev" field of the existing list's head to point to the new head. As a
+   * result, in Scala 2.8.1, every DoulbeLinkedList node's prev field is null.
+   * Samza gets around this by manually updating the field itself. See SAMZA-80
+   * for details.
+   *
+   * This issue is exposed in Samza's KV cache implementation, which uses
+   * DoubleLinkedList, so all comments in this method are discussing the cached
+   * implementation, but the test is still useful as a sanity check for
+   * non-cached stores.
+   */
+  @Test
+  def testBrokenScalaDoubleLinkedList() {
+    val something = b("")
+    val keys = letters
+      .map(b(_))
+      .toArray
+
+    // Load the cache to capacity.
+    letters
+      .slice(0, TestKeyValueStores.CacheSize)
+      .map(b(_))
+      .foreach(store.put(_, something))
+
+    // Now keep everything in the cache, but with an empty dirty list.
+    store.flush
+
+    // Dirty list is now empty, and every CacheEntry has dirty=null.
+
+    // Corrupt the dirty list by creating two dirty lists that toggle back and 
+    // forth depending on whether the last dirty write was to 1 or 0. The trick
+    // here is that every element in the cache is treated as the "head" of the
+    // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus,
+    // You can end up with multiple nodes each having their own version of the 
+    // dirty list with different elements in them.
+    store.put(keys(1), something)
+    store.put(keys(0), something)
+    store.put(keys(1), something)
+    store.flush
+    // The dirty list is now empty, but 0's dirty field actually has 0 and 1.
+    store.put(keys(0), something)
+    // The dirty list now has 0 and 1, but 1's dirty field is null in the 
+    // cache because it was just flushed.
+
+    // Get rid of 1 from the cache by reading every other element, and then 
+    // putting one new element.
+    letters
+      .slice(2, TestKeyValueStores.CacheSize)
+      .map(b(_))
+      .foreach(store.get(_))
+    store.put(keys(TestKeyValueStores.CacheSize), something)
+
+    // Now try and trigger an NPE since the dirty list has an element (1) 
+    // that's no longer in the cache.
+    store.flush
+  }
+
+  /**
+   * A little test that tries to simulate a few common patterns:
+   * read-modify-write, and do-some-stuff-then-delete (windowing).
+   */
+  @Test
+  def testRandomReadWriteRemove() {
+    // Make test deterministic by seeding the random number generator.
+    val rand = new Random(12345)
+    val keys = letters
+      .map(b(_))
+      .toArray
+
+    // Map from letter to key byte array used for letter, and expected value.
+    // We have to go through some acrobatics here since Java's byte array uses 
+    // object identity for .equals. Two byte arrays can have identical byte 
+    // elements, but not be equal.
+    var expected = Map[String, (Array[Byte], String)]()
+
+    (0 until 100).foreach(loop => {
+      (0 until 30).foreach(i => {
+        val idx = rand.nextInt(keys.length)
+        val randomValue = letters(rand.nextInt(keys.length))
+        val key = keys(idx)
+        val currentVal = store.get(key)
+        store.put(key, b(randomValue))
+        expected += letters(idx) -> (key, randomValue)
+      })
+
+      for ((k, v) <- expected) {
+        val bytes = store.get(v._1)
+        assertNotNull(bytes)
+        assertEquals(v._2, new String(bytes, "UTF-8"))
+      }
+
+      val iterator = store.all
+
+      while (iterator.hasNext) {
+        val key = iterator.next.getKey
+        store.delete(key)
+        expected -= new String(key, "UTF-8")
+      }
+
+      iterator.close
+
+      assertEquals(0, expected.size)
+    })
+  }
+
   def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], 
Array[Byte]]) {
     for (v <- vals) {
       assertTrue(iter.hasNext)
@@ -178,7 +283,7 @@ class TestKeyValueStores(cache: Boolean) {
 
 object TestKeyValueStores {
   val CacheSize = 10
-  val BatchSize = 2
+  val BatchSize = 5
   @Parameters
   def parameters: java.util.Collection[Array[java.lang.Boolean]] = 
Arrays.asList(Array(java.lang.Boolean.TRUE), Array(java.lang.Boolean.FALSE))
 }

Reply via email to