Repository: samza Updated Branches: refs/heads/master 479932242 -> 7dd356c50
SAMZA-957: Avoid unnecessary KV Store flushes (part 3) Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7dd356c5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7dd356c5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7dd356c5 Branch: refs/heads/master Commit: 7dd356c5010761bce22accf1c8a9f00f0a62d822 Parents: 4799322 Author: Jacob Maes <[email protected]> Authored: Tue Jun 7 14:39:13 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Jun 7 14:39:13 2016 -0700 ---------------------------------------------------------------------- .../apache/samza/storage/kv/CachedStore.scala | 30 +++++----- .../samza/storage/kv/TestCachedStore.scala | 58 ++++++++++++++++++-- 2 files changed, 70 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7dd356c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index c28f8db..ae6717d 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -63,18 +63,12 @@ class CachedStore[K, V]( private val cache = new java.util.LinkedHashMap[K, CacheEntry[K, V]]((cacheSize * 1.2).toInt, 1.0f, true) { override def removeEldestEntry(eldest: java.util.Map.Entry[K, CacheEntry[K, V]]): Boolean = { val evict = super.size > cacheSize - // We need backwards compatibility with the previous broken flushing behavior for array keys. - if (evict || hasArrayKeys) { + if (evict) { val entry = eldest.getValue // if this entry hasn't been written out yet, flush it and all other dirty keys if (entry.dirty != null) { - if (hasArrayKeys) { - debug("Found a dirty entry and cache has array keys. Flushing.") - flush() - } else { - debug("Found a dirty entry. Calling putAll() on all dirty entries.") - putAllDirtyEntries() - } + debug("Found a dirty entry. Calling putAll() on all dirty entries.") + putAllDirtyEntries() } } evict @@ -128,14 +122,14 @@ class CachedStore[K, V]( override def range(from: K, to: K): KeyValueIterator[K, V] = { metrics.ranges.inc - flush() + putAllDirtyEntries() new CachedStoreIterator(store.range(from, to)) } override def all(): KeyValueIterator[K, V] = { metrics.alls.inc - flush() + putAllDirtyEntries() new CachedStoreIterator(store.all()) } @@ -173,9 +167,19 @@ class CachedStore[K, V]( } // putAll() dirty values if the write list is full. - if (dirtyCount >= writeBatchSize) { + val purgeNeeded = if (dirtyCount >= writeBatchSize) { debug("Dirty count %s >= write batch size %s. Calling putAll() on all dirty entries." format (dirtyCount, writeBatchSize)) + true + } else if (hasArrayKeys) { + // Flush every time to support the following legacy behavior: + // If array keys are used with a cached store, get() will always miss the cache because of array equality semantics + // However, it will fall back to the underlying store which does support arrays. + true + } else { + false + } + if (purgeNeeded) { putAllDirtyEntries() } } @@ -232,7 +236,7 @@ class CachedStore[K, V]( private def checkKeyIsArray(key: K) { if (!containsArrayKeys && key.isInstanceOf[Array[_]]) { // Warn the first time that we see an array key. - warn("Using arrays as keys results in unpredictable behavior since cache is implemented with a map. Consider using ByteBuffer, or a different key type.") + warn("Using arrays as keys results in unpredictable behavior since cache is implemented with a map. Consider using ByteBuffer, or a different key type, or turn off the cache altogether.") containsArrayKeys = true } } http://git-wip-us.apache.org/repos/asf/samza/blob/7dd356c5/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala index eee7447..96eb5fa 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala @@ -20,14 +20,14 @@ package org.apache.samza.storage.kv import java.util +import java.util.Arrays -import org.junit.Test import org.junit.Assert._ +import org.junit.Test import org.mockito.ArgumentCaptor -import org.mockito.Mockito._ import org.mockito.Matchers.anyObject +import org.mockito.Mockito._ -import java.util.Arrays import scala.collection.JavaConverters._ class TestCachedStore { @@ -38,12 +38,60 @@ class TestCachedStore { assertFalse(store.hasArrayKeys) store.put("test1-key".getBytes("UTF-8"), "test1-value".getBytes("UTF-8")) - // Ensure we preserve old, broken flushing behavior for array keys - verify(kv).flush(); assertTrue(store.hasArrayKeys) } @Test + def testLRUCacheEviction() { + val kv = spy(new MockKeyValueStore()) + val store = new CachedStore[String, String](kv, 2, 2) + assertFalse("KV store should be empty", kv.all().hasNext) + + // Below eviction threshold + store.put("test1-key", "test1-value") + assertFalse("Entries should not have been purged yet", kv.all().hasNext) + + // Batch limit reached + store.put("test2-key", "test2-value") + assertTrue("Entries should be purged as soon as there are batchSize dirty entries", kv.all().hasNext) + + // kv.putAll() should have been called, verified below. + + // All dirty values should have been added to the underlying store + // KV store should have both items + val kvItr = kv.all(); + assertNotNull(kvItr.next()) + assertNotNull(kvItr.next()) + assertFalse(kvItr.hasNext) + + // Above eviction threshold but eldest entries are not dirty + store.put("test3-key", "test3-value") + + // KV store should not have the 3rd item. We only purge if the batch size is exceeded or if the eldest(expiring) entry is dirty. + val kvItr2 = kv.all(); + assertNotNull(kvItr2.next()) + assertNotNull(kvItr2.next()) + assertFalse(kvItr2.hasNext) + + // Force the dirty key to be the eldest by reading a different key + store.get("test2-key") + + // Add one more. We should not purge all items again. Only when dirty items exceed the threshold. + store.put("test4-key", "test4-value") + + // The eldest item should have been purged along with the just-added item, so the KV store should have all 4 items. + val kvItr3 = kv.all(); + assertNotNull(kvItr3.next()) + assertNotNull(kvItr3.next()) + assertNotNull(kvItr3.next()) + assertNotNull(kvItr3.next()) + assertFalse(kvItr3.hasNext) + + // There should have been 2 purges; one for exceeding the batch size, and one for expiring a dirty cache entry. + verify(kv, times(2)).putAll(anyObject()); + } + + @Test def testIterator() { val kv = new MockKeyValueStore() val store = new CachedStore[String, String](kv, 100, 100)
