Repository: samza
Updated Branches:
  refs/heads/master 479932242 -> 7dd356c50


SAMZA-957: Avoid unnecessary KV Store flushes (part 3)


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7dd356c5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7dd356c5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7dd356c5

Branch: refs/heads/master
Commit: 7dd356c5010761bce22accf1c8a9f00f0a62d822
Parents: 4799322
Author: Jacob Maes <[email protected]>
Authored: Tue Jun 7 14:39:13 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Tue Jun 7 14:39:13 2016 -0700

----------------------------------------------------------------------
 .../apache/samza/storage/kv/CachedStore.scala   | 30 +++++-----
 .../samza/storage/kv/TestCachedStore.scala      | 58 ++++++++++++++++++--
 2 files changed, 70 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7dd356c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index c28f8db..ae6717d 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -63,18 +63,12 @@ class CachedStore[K, V](
   private val cache = new java.util.LinkedHashMap[K, CacheEntry[K, 
V]]((cacheSize * 1.2).toInt, 1.0f, true) {
     override def removeEldestEntry(eldest: java.util.Map.Entry[K, 
CacheEntry[K, V]]): Boolean = {
       val evict = super.size > cacheSize
-      // We need backwards compatibility with the previous broken flushing 
behavior for array keys.
-      if (evict || hasArrayKeys) {
+      if (evict) {
         val entry = eldest.getValue
         // if this entry hasn't been written out yet, flush it and all other 
dirty keys
         if (entry.dirty != null) {
-          if (hasArrayKeys) {
-            debug("Found a dirty entry and cache has array keys. Flushing.")
-            flush()
-          } else {
-            debug("Found a dirty entry. Calling putAll() on all dirty 
entries.")
-            putAllDirtyEntries()
-          }
+          debug("Found a dirty entry. Calling putAll() on all dirty entries.")
+          putAllDirtyEntries()
         }
       }
       evict
@@ -128,14 +122,14 @@ class CachedStore[K, V](
 
   override def range(from: K, to: K): KeyValueIterator[K, V] = {
     metrics.ranges.inc
-    flush()
+    putAllDirtyEntries()
 
     new CachedStoreIterator(store.range(from, to))
   }
 
   override def all(): KeyValueIterator[K, V] = {
     metrics.alls.inc
-    flush()
+    putAllDirtyEntries()
 
     new CachedStoreIterator(store.all())
   }
@@ -173,9 +167,19 @@ class CachedStore[K, V](
     }
 
     // putAll() dirty values if the write list is full.
-    if (dirtyCount >= writeBatchSize) {
+    val purgeNeeded = if (dirtyCount >= writeBatchSize) {
       debug("Dirty count %s >= write batch size %s. Calling putAll() on all 
dirty entries." format (dirtyCount, writeBatchSize))
+      true
+    } else if (hasArrayKeys) {
+      // Flush every time to support the following legacy behavior:
+      // If array keys are used with a cached store, get() will always miss 
the cache because of array equality semantics
+      // However, it will fall back to the underlying store which does support 
arrays.
+      true
+    } else {
+      false
+    }
 
+    if (purgeNeeded) {
       putAllDirtyEntries()
     }
   }
@@ -232,7 +236,7 @@ class CachedStore[K, V](
   private def checkKeyIsArray(key: K) {
     if (!containsArrayKeys && key.isInstanceOf[Array[_]]) {
       // Warn the first time that we see an array key.
-      warn("Using arrays as keys results in unpredictable behavior since cache 
is implemented with a map. Consider using ByteBuffer, or a different key type.")
+      warn("Using arrays as keys results in unpredictable behavior since cache 
is implemented with a map. Consider using ByteBuffer, or a different key type, 
or turn off the cache altogether.")
       containsArrayKeys = true
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/7dd356c5/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
index eee7447..96eb5fa 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala
@@ -20,14 +20,14 @@
 package org.apache.samza.storage.kv
 
 import java.util
+import java.util.Arrays
 
-import org.junit.Test
 import org.junit.Assert._
+import org.junit.Test
 import org.mockito.ArgumentCaptor
-import org.mockito.Mockito._
 import org.mockito.Matchers.anyObject
+import org.mockito.Mockito._
 
-import java.util.Arrays
 import scala.collection.JavaConverters._
 
 class TestCachedStore {
@@ -38,12 +38,60 @@ class TestCachedStore {
 
     assertFalse(store.hasArrayKeys)
     store.put("test1-key".getBytes("UTF-8"), "test1-value".getBytes("UTF-8"))
-    // Ensure we preserve old, broken flushing behavior for array keys
-    verify(kv).flush();
     assertTrue(store.hasArrayKeys)
   }
 
   @Test
+  def testLRUCacheEviction() {
+    val kv = spy(new MockKeyValueStore())
+    val store = new CachedStore[String, String](kv, 2, 2)
+    assertFalse("KV store should be empty", kv.all().hasNext)
+
+    // Below eviction threshold
+    store.put("test1-key", "test1-value")
+    assertFalse("Entries should not have been purged yet", kv.all().hasNext)
+
+    // Batch limit reached
+    store.put("test2-key", "test2-value")
+    assertTrue("Entries should be purged as soon as there are batchSize dirty 
entries", kv.all().hasNext)
+
+    // kv.putAll() should have been called, verified below.
+
+    // All dirty values should have been added to the underlying store
+    // KV store should have both items
+    val kvItr = kv.all();
+    assertNotNull(kvItr.next())
+    assertNotNull(kvItr.next())
+    assertFalse(kvItr.hasNext)
+
+    // Above eviction threshold but eldest entries are not dirty
+    store.put("test3-key", "test3-value")
+
+    // KV store should not have the 3rd item. We only purge if the batch size 
is exceeded or if the eldest(expiring) entry is dirty.
+    val kvItr2 = kv.all();
+    assertNotNull(kvItr2.next())
+    assertNotNull(kvItr2.next())
+    assertFalse(kvItr2.hasNext)
+
+    // Force the dirty key to be the eldest by reading a different key
+    store.get("test2-key")
+
+    // Add one more. We should not purge all items again. Only when dirty 
items exceed the threshold.
+    store.put("test4-key", "test4-value")
+
+    // The eldest item should have been purged along with the just-added item, 
so the KV store should have all 4 items.
+    val kvItr3 = kv.all();
+    assertNotNull(kvItr3.next())
+    assertNotNull(kvItr3.next())
+    assertNotNull(kvItr3.next())
+    assertNotNull(kvItr3.next())
+    assertFalse(kvItr3.hasNext)
+
+    // There should have been 2 purges; one for exceeding the batch size, and 
one for expiring a dirty cache entry.
+    verify(kv, times(2)).putAll(anyObject());
+  }
+
+  @Test
   def testIterator() {
     val kv = new MockKeyValueStore()
     val store = new CachedStore[String, String](kv, 100, 100)

Reply via email to