Repository: samza Updated Branches: refs/heads/master 127707384 -> 5154f9eb8
SAMZA-2010: Handle null value in LocalReadWriteTable.putAll() To be consistent with put(), null values in input should be delete operation Author: Wei Song <[email protected]> Reviewers: Ahmed Abdul Hamid <[email protected]> Closes #827 from weisong44/SAMZA-2010 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5154f9eb Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5154f9eb Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5154f9eb Branch: refs/heads/master Commit: 5154f9eb8f484126dbd810a350bf15a10645a1f7 Parents: 1277073 Author: Wei Song <[email protected]> Authored: Thu Nov 29 15:31:58 2018 -0800 Committer: Wei Song <[email protected]> Committed: Thu Nov 29 15:31:58 2018 -0800 ---------------------------------------------------------------------- .../samza/storage/kv/LocalReadWriteTable.java | 19 ++++++++++++++++- .../storage/kv/TestLocalReadWriteTable.java | 22 +++++++++++++------- .../storage/kv/TestLocalReadableTable.java | 6 +++--- 3 files changed, 35 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5154f9eb/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java index 98d3768..2704429 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java @@ -18,6 +18,7 @@ */ package org.apache.samza.storage.kv; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.samza.metrics.Counter; @@ -69,7 +70,23 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V> @Override public void putAll(List<Entry<K, V>> entries) { - instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(entries)); + List<Entry<K, V>> toPut = new LinkedList<>(); + List<K> toDelete = new LinkedList<>(); + entries.forEach(e -> { + if (e.getValue() != null) { + toPut.add(e); + } else { + toDelete.add(e.getKey()); + } + }); + + if (!toPut.isEmpty()) { + instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(toPut)); + } + + if (!toDelete.isEmpty()) { + deleteAll(toDelete); + } } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/5154f9eb/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java index 5531951..70cde27 100644 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java @@ -18,9 +18,11 @@ */ package org.apache.samza.storage.kv; -import java.util.Collections; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.samza.config.MapConfig; import org.apache.samza.config.MetricsConfig; @@ -99,15 +101,17 @@ public class TestLocalReadWriteTable { ReadWriteTable table = createTable(false); table.put("k1", "v1"); table.putAsync("k2", "v2").get(); + table.putAsync("k3", null).get(); verify(kvStore, times(2)).put(any(), any()); + verify(kvStore, times(1)).delete(any()); Assert.assertEquals(2, numPuts.getCount()); + Assert.assertEquals(1, numDeletes.getCount()); Assert.assertTrue(putNs.getSnapshot().getAverage() > 0); + Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0); Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); Assert.assertEquals(0, numPutAlls.getCount()); - Assert.assertEquals(0, numDeletes.getCount()); Assert.assertEquals(0, numDeleteAlls.getCount()); Assert.assertEquals(0, numFlushes.getCount()); Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); @@ -117,18 +121,20 @@ public class TestLocalReadWriteTable { @Test public void testPutAll() throws Exception { ReadWriteTable table = createTable(false); - table.putAll(Collections.emptyList()); - table.putAllAsync(Collections.emptyList()).get(); + List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null)); + table.putAll(entries); + table.putAllAsync(entries).get(); verify(kvStore, times(2)).putAll(any()); + verify(kvStore, times(2)).deleteAll(any()); Assert.assertEquals(2, numPutAlls.getCount()); + Assert.assertEquals(2, numDeleteAlls.getCount()); Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0); + Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0); Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); - Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); Assert.assertEquals(0, numPuts.getCount()); Assert.assertEquals(0, numDeletes.getCount()); - Assert.assertEquals(0, numDeleteAlls.getCount()); Assert.assertEquals(0, numFlushes.getCount()); Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); @@ -211,7 +217,7 @@ public class TestLocalReadWriteTable { verify(metricsRegistry, times(0)).newGauge(anyString(), any()); Assert.assertEquals(1, numFlushes.getCount()); Assert.assertEquals(2, numPuts.getCount()); - Assert.assertEquals(2, numPutAlls.getCount()); + Assert.assertEquals(0, numPutAlls.getCount()); Assert.assertEquals(2, numDeletes.getCount()); Assert.assertEquals(2, numDeleteAlls.getCount()); Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); http://git-wip-us.apache.org/repos/asf/samza/blob/5154f9eb/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java index 1f2d586..44802b0 100644 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java @@ -124,13 +124,13 @@ public class TestLocalReadableTable { ReadableTable table = createTable(true); table.get(""); table.getAsync("").get(); - table.getAll(Collections.emptyList()); - table.getAllAsync(Collections.emptyList()).get(); + table.getAll(keys); + table.getAllAsync(keys).get(); verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString()); verify(metricsRegistry, times(0)).newTimer(anyString(), anyString()); verify(metricsRegistry, times(0)).newGauge(anyString(), any()); Assert.assertEquals(2, numGets.getCount()); - Assert.assertEquals(2, numMissedLookups.getCount()); + Assert.assertEquals(4, numMissedLookups.getCount()); Assert.assertEquals(2, numGetAlls.getCount()); Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001); Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
