Repository: samza Updated Branches: refs/heads/master 22a1d6f18 -> 8c7e2eb04
SAMZA-812: CachedStore flush too often Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8c7e2eb0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8c7e2eb0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8c7e2eb0 Branch: refs/heads/master Commit: 8c7e2eb04c783e1f1cb7234c12b39e29cf68c903 Parents: 22a1d6f Author: Tommy Becker <[email protected]> Authored: Wed Nov 11 09:01:08 2015 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Wed Nov 11 09:01:08 2015 -0800 ---------------------------------------------------------------------- .../apache/samza/storage/kv/CachedStore.scala | 18 ++++++++----- .../samza/storage/kv/TestCachedStore.scala | 27 +++++++++++++++++++- 2 files changed, 37 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8c7e2eb0/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index 1112350..9a5b2d5 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -62,14 +62,18 @@ class CachedStore[K, V]( /** an lru cache of values that holds cacheEntries and calls flush() if necessary when discarding */ private val cache = new java.util.LinkedHashMap[K, CacheEntry[K, V]]((cacheSize * 1.2).toInt, 1.0f, true) { override def removeEldestEntry(eldest: java.util.Map.Entry[K, CacheEntry[K, V]]): Boolean = { - val entry = eldest.getValue - // if this entry hasn't been written out yet, flush it and all other dirty keys - if (entry.dirty != null) { - debug("Found a dirty entry. Flushing.") - - flush() + val evict = super.size > cacheSize + // We need backwards compatibility with the previous broken flushing behavior for array keys. + if (evict || hasArrayKeys) { + val entry = eldest.getValue + // if this entry hasn't been written out yet, flush it and all other dirty keys + if (entry.dirty != null) { + debug("Found a dirty entry. Flushing.") + + flush() + } } - super.size > cacheSize + evict } } http://git-wip-us.apache.org/repos/asf/samza/blob/8c7e2eb0/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala index cc9c9f3..198720c 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala @@ -33,6 +33,8 @@ class TestCachedStore { assertFalse(store.hasArrayKeys) store.put("test1-key".getBytes("UTF-8"), "test1-value".getBytes("UTF-8")) + // Ensure we preserve old, broken flushing behavior for array keys + verify(kv).flush(); assertTrue(store.hasArrayKeys) } @@ -88,4 +90,27 @@ class TestCachedStore { assertNull(kv.get(keys.get(1))) assertNull(store.get(keys.get(1))) } -} \ No newline at end of file + + @Test + def testFlushing() { + val kv = mock(classOf[KeyValueStore[String, String]]) + val store = new CachedStore[String, String](kv, 4, 4) + + val keys = Arrays.asList("test1-key", + "test2-key", + "test3-key", + "test4-key") + val values = Arrays.asList("test1-value", + "test2-value", + "test3-value", + "test4-value") + + for (i <- 0 until 3) { + store.put(keys.get(i), values.get(i)) + } + + verify(kv, never()).flush() + store.put(keys.get(3), values.get(3)); + verify(kv).flush(); + } +}
