Repository: samza Updated Branches: refs/heads/master 36c4c4b02 -> 161d1c47a
Convert a put to to a delete operation in ReadWriteTable and TableWriteFunction when input value is null Currently, the behavior of putting a null value is inconsistent: it is a delete for RocksDB, and not supported in in-memory store, and on a case-by-case basis for remote tables. It is desirable to unify the behavior. Furthermore, it eases the writing of a change captured stream to a table. A change captured stream contains typically 3 types of events: INSERT, UPDATE and DELETE, and they need to be applied properly when written to a table to produce a correct snapshot. In a change captured stream the payload of a DELETE event is typically is null, and this would result in a delete operation to a table in sendTo() operator. Author: Wei Song <[email protected]> Closes #547 from weisong44/table-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/161d1c47 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/161d1c47 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/161d1c47 Branch: refs/heads/master Commit: 161d1c47a2c7322a7d3197d571a227cce0f1cbbf Parents: 36c4c4b Author: Wei Song <[email protected]> Authored: Thu Jun 7 16:51:30 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Thu Jun 7 16:51:30 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/samza/table/ReadWriteTable.java | 9 ++++++--- .../apache/samza/table/caching/guava/GuavaCacheTable.java | 6 +++++- .../org/apache/samza/table/remote/RemoteReadWriteTable.java | 6 ++++++ .../org/apache/samza/table/remote/TableWriteFunction.java | 7 ++++++- .../samza/storage/kv/LocalStoreBackedReadWriteTable.java | 6 +++++- .../apache/samza/sql/testutil/TestIOResolverFactory.java | 7 ++++--- 6 files changed, 32 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java index d617153..def5afb 100644 --- a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java +++ b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java @@ -19,7 +19,6 @@ package org.apache.samza.table; import java.util.List; - import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.storage.kv.Entry; @@ -36,17 +35,21 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> { /** * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}. * + * The key is deleted from the table if value is {@code null}. + * * @param key the key with which the specified {@code value} is to be associated. * @param value the value with which the specified {@code key} is to be associated. - * @throws NullPointerException if the specified {@code key} or {@code value} is {@code null}. + * @throws NullPointerException if the specified {@code key} is {@code null}. */ void put(K key, V value); /** * Updates the mappings of the specified key-value {@code entries}. * + * A key is deleted from the table if its corresponding value is {@code null}. + * * @param entries the updated mappings to put into this table. - * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key or value. + * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key. */ void putAll(List<Entry<K, V>> entries); http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java index 27bf971..3f8ab51 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java @@ -66,7 +66,11 @@ public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> { @Override public void put(K key, V value) { - cache.put(key, value); + if (value != null) { + cache.put(key, value); + } else { + delete(key); + } } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java index a47e349..a640efb 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java @@ -84,6 +84,12 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem */ @Override public void put(K key, V value) { + + if (value == null) { + delete(key); + return; + } + try { numPuts.inc(); if (rateLimitWrites) { http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java index 3fb8fda..df54878 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java @@ -22,7 +22,6 @@ package org.apache.samza.table.remote; import java.io.Serializable; import java.util.Collection; import java.util.List; - import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.functions.ClosableFunction; import org.apache.samza.operators.functions.InitableFunction; @@ -44,6 +43,9 @@ import org.apache.samza.storage.kv.Entry; public interface TableWriteFunction<K, V> extends Serializable, InitableFunction, ClosableFunction { /** * Store single table {@code record} with specified {@code key}. This method must be thread-safe. + * + * The key is deleted if record is {@code null}. + * * @param key key for the table record * @param record table record to be written */ @@ -51,6 +53,9 @@ public interface TableWriteFunction<K, V> extends Serializable, InitableFunction /** * Store the table {@code records} with specified {@code keys}. This method must be thread-safe. + * + * A key is deleted if its corresponding record is {@code null}. + * * @param records table records to be written */ default void putAll(List<Entry<K, V>> records) { http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java index 4037f60..906ee1d 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java @@ -42,7 +42,11 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab @Override public void put(K key, V value) { - kvStore.put(key, value); + if (value != null) { + kvStore.put(key, value); + } else { + delete(key); + } } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/161d1c47/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java index bbe2a7e..8a20239 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.commons.lang.NotImplementedException; import org.apache.samza.config.Config; import org.apache.samza.container.SamzaContainerContext; @@ -34,9 +33,9 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlCompositeKey; import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOResolverFactory; -import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.storage.kv.RocksDbTableDescriptor; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.Table; @@ -91,8 +90,10 @@ public class TestIOResolverFactory implements SqlIOResolverFactory { public void put(Object key, Object value) { if (key == null) { records.put(System.nanoTime(), value); - } else { + } else if (value != null) { records.put(key, value); + } else { + delete(key); } }
