Repository: samza
Updated Branches:
  refs/heads/master 36c4c4b02 -> 161d1c47a


Convert a put to to a delete operation in ReadWriteTable and TableWriteFunction 
when input value is null

Currently, the behavior of putting a null value is inconsistent: it is a delete 
for RocksDB, and not supported in in-memory store, and on a case-by-case basis 
for remote tables. It is desirable to unify the behavior. Furthermore, it eases 
the writing of a change captured stream to a table. A change captured stream 
contains typically 3 types of events: INSERT, UPDATE and DELETE, and they need 
to be applied properly when written to a table to produce a correct snapshot. 
In a change captured stream the payload of a DELETE event is typically is null, 
and this would result in a delete operation to a table in sendTo() operator.

Author: Wei Song <[email protected]>

Closes #547 from weisong44/table-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/161d1c47
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/161d1c47
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/161d1c47

Branch: refs/heads/master
Commit: 161d1c47a2c7322a7d3197d571a227cce0f1cbbf
Parents: 36c4c4b
Author: Wei Song <[email protected]>
Authored: Thu Jun 7 16:51:30 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Thu Jun 7 16:51:30 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/table/ReadWriteTable.java    | 9 ++++++---
 .../apache/samza/table/caching/guava/GuavaCacheTable.java   | 6 +++++-
 .../org/apache/samza/table/remote/RemoteReadWriteTable.java | 6 ++++++
 .../org/apache/samza/table/remote/TableWriteFunction.java   | 7 ++++++-
 .../samza/storage/kv/LocalStoreBackedReadWriteTable.java    | 6 +++++-
 .../apache/samza/sql/testutil/TestIOResolverFactory.java    | 7 ++++---
 6 files changed, 32 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java 
b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
index d617153..def5afb 100644
--- a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
@@ -19,7 +19,6 @@
 package org.apache.samza.table;
 
 import java.util.List;
-
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.storage.kv.Entry;
 
@@ -36,17 +35,21 @@ public interface ReadWriteTable<K, V> extends 
ReadableTable<K, V> {
   /**
    * Updates the mapping of the specified key-value pair; Associates the 
specified {@code key} with the specified {@code value}.
    *
+   * The key is deleted from the table if value is {@code null}.
+   *
    * @param key the key with which the specified {@code value} is to be 
associated.
    * @param value the value with which the specified {@code key} is to be 
associated.
-   * @throws NullPointerException if the specified {@code key} or {@code 
value} is {@code null}.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
    */
   void put(K key, V value);
 
   /**
    * Updates the mappings of the specified key-value {@code entries}.
    *
+   * A key is deleted from the table if its corresponding value is {@code 
null}.
+   *
    * @param entries the updated mappings to put into this table.
-   * @throws NullPointerException if any of the specified {@code entries} has 
{@code null} as key or value.
+   * @throws NullPointerException if any of the specified {@code entries} has 
{@code null} as key.
    */
   void putAll(List<Entry<K, V>> entries);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
 
b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index 27bf971..3f8ab51 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -66,7 +66,11 @@ public class GuavaCacheTable<K, V> implements 
ReadWriteTable<K, V> {
 
   @Override
   public void put(K key, V value) {
-    cache.put(key, value);
+    if (value != null) {
+      cache.put(key, value);
+    } else {
+      delete(key);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index a47e349..a640efb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -84,6 +84,12 @@ public class RemoteReadWriteTable<K, V> extends 
RemoteReadableTable<K, V> implem
    */
   @Override
   public void put(K key, V value) {
+
+    if (value == null) {
+      delete(key);
+      return;
+    }
+
     try {
       numPuts.inc();
       if (rateLimitWrites) {

http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
index 3fb8fda..df54878 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
@@ -22,7 +22,6 @@ package org.apache.samza.table.remote;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
-
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.functions.ClosableFunction;
 import org.apache.samza.operators.functions.InitableFunction;
@@ -44,6 +43,9 @@ import org.apache.samza.storage.kv.Entry;
 public interface TableWriteFunction<K, V> extends Serializable, 
InitableFunction, ClosableFunction {
   /**
    * Store single table {@code record} with specified {@code key}. This method 
must be thread-safe.
+   *
+   * The key is deleted if record is {@code null}.
+   *
    * @param key key for the table record
    * @param record table record to be written
    */
@@ -51,6 +53,9 @@ public interface TableWriteFunction<K, V> extends 
Serializable, InitableFunction
 
   /**
    * Store the table {@code records} with specified {@code keys}. This method 
must be thread-safe.
+   *
+   * A key is deleted if its corresponding record is {@code null}.
+   *
    * @param records table records to be written
    */
   default void putAll(List<Entry<K, V>> records) {

http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
index 4037f60..906ee1d 100644
--- 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
+++ 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
@@ -42,7 +42,11 @@ public class LocalStoreBackedReadWriteTable<K, V> extends 
LocalStoreBackedReadab
 
   @Override
   public void put(K key, V value) {
-    kvStore.put(key, value);
+    if (value != null) {
+      kvStore.put(key, value);
+    } else {
+      delete(key);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index bbe2a7e..8a20239 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.SamzaContainerContext;
@@ -34,9 +33,9 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlCompositeKey;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.storage.kv.RocksDbTableDescriptor;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
@@ -91,8 +90,10 @@ public class TestIOResolverFactory implements 
SqlIOResolverFactory {
     public void put(Object key, Object value) {
       if (key == null) {
         records.put(System.nanoTime(), value);
-      } else {
+      } else if (value != null) {
         records.put(key, value);
+      } else {
+        delete(key);
       }
     }
 

Reply via email to