Repository: samza
Updated Branches:
  refs/heads/master 127707384 -> 5154f9eb8


SAMZA-2010: Handle null value in LocalReadWriteTable.putAll()

To be consistent with put(), null values in input should be delete operation

Author: Wei Song <[email protected]>

Reviewers: Ahmed Abdul Hamid <[email protected]>

Closes #827 from weisong44/SAMZA-2010


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5154f9eb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5154f9eb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5154f9eb

Branch: refs/heads/master
Commit: 5154f9eb8f484126dbd810a350bf15a10645a1f7
Parents: 1277073
Author: Wei Song <[email protected]>
Authored: Thu Nov 29 15:31:58 2018 -0800
Committer: Wei Song <[email protected]>
Committed: Thu Nov 29 15:31:58 2018 -0800

----------------------------------------------------------------------
 .../samza/storage/kv/LocalReadWriteTable.java   | 19 ++++++++++++++++-
 .../storage/kv/TestLocalReadWriteTable.java     | 22 +++++++++++++-------
 .../storage/kv/TestLocalReadableTable.java      |  6 +++---
 3 files changed, 35 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5154f9eb/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
index 98d3768..2704429 100644
--- 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
+++ 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.storage.kv;
 
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.samza.metrics.Counter;
@@ -69,7 +70,23 @@ public class LocalReadWriteTable<K, V> extends 
LocalReadableTable<K, V>
 
   @Override
   public void putAll(List<Entry<K, V>> entries) {
-    instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> 
kvStore.putAll(entries));
+    List<Entry<K, V>> toPut = new LinkedList<>();
+    List<K> toDelete = new LinkedList<>();
+    entries.forEach(e -> {
+        if (e.getValue() != null) {
+          toPut.add(e);
+        } else {
+          toDelete.add(e.getKey());
+        }
+      });
+
+    if (!toPut.isEmpty()) {
+      instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> 
kvStore.putAll(toPut));
+    }
+
+    if (!toDelete.isEmpty()) {
+      deleteAll(toDelete);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/5154f9eb/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
index 5531951..70cde27 100644
--- 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
+++ 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
@@ -18,9 +18,11 @@
  */
 package org.apache.samza.storage.kv;
 
-import java.util.Collections;
+import java.util.Arrays;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.MetricsConfig;
@@ -99,15 +101,17 @@ public class TestLocalReadWriteTable {
     ReadWriteTable table = createTable(false);
     table.put("k1", "v1");
     table.putAsync("k2", "v2").get();
+    table.putAsync("k3", null).get();
     verify(kvStore, times(2)).put(any(), any());
+    verify(kvStore, times(1)).delete(any());
     Assert.assertEquals(2, numPuts.getCount());
+    Assert.assertEquals(1, numDeletes.getCount());
     Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
+    Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
     Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
     Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
     Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
     Assert.assertEquals(0, numPutAlls.getCount());
-    Assert.assertEquals(0, numDeletes.getCount());
     Assert.assertEquals(0, numDeleteAlls.getCount());
     Assert.assertEquals(0, numFlushes.getCount());
     Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
@@ -117,18 +121,20 @@ public class TestLocalReadWriteTable {
   @Test
   public void testPutAll() throws Exception {
     ReadWriteTable table = createTable(false);
-    table.putAll(Collections.emptyList());
-    table.putAllAsync(Collections.emptyList()).get();
+    List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", 
null));
+    table.putAll(entries);
+    table.putAllAsync(entries).get();
     verify(kvStore, times(2)).putAll(any());
+    verify(kvStore, times(2)).deleteAll(any());
     Assert.assertEquals(2, numPutAlls.getCount());
+    Assert.assertEquals(2, numDeleteAlls.getCount());
     Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
     Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
     Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
-    Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
     Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
     Assert.assertEquals(0, numPuts.getCount());
     Assert.assertEquals(0, numDeletes.getCount());
-    Assert.assertEquals(0, numDeleteAlls.getCount());
     Assert.assertEquals(0, numFlushes.getCount());
     Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
     Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
@@ -211,7 +217,7 @@ public class TestLocalReadWriteTable {
     verify(metricsRegistry, times(0)).newGauge(anyString(), any());
     Assert.assertEquals(1, numFlushes.getCount());
     Assert.assertEquals(2, numPuts.getCount());
-    Assert.assertEquals(2, numPutAlls.getCount());
+    Assert.assertEquals(0, numPutAlls.getCount());
     Assert.assertEquals(2, numDeletes.getCount());
     Assert.assertEquals(2, numDeleteAlls.getCount());
     Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);

http://git-wip-us.apache.org/repos/asf/samza/blob/5154f9eb/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
index 1f2d586..44802b0 100644
--- 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
+++ 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
@@ -124,13 +124,13 @@ public class TestLocalReadableTable {
     ReadableTable table = createTable(true);
     table.get("");
     table.getAsync("").get();
-    table.getAll(Collections.emptyList());
-    table.getAllAsync(Collections.emptyList()).get();
+    table.getAll(keys);
+    table.getAllAsync(keys).get();
     verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
     verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
     verify(metricsRegistry, times(0)).newGauge(anyString(), any());
     Assert.assertEquals(2, numGets.getCount());
-    Assert.assertEquals(2, numMissedLookups.getCount());
+    Assert.assertEquals(4, numMissedLookups.getCount());
     Assert.assertEquals(2, numGetAlls.getCount());
     Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
     Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);

Reply via email to