Repository: kafka
Updated Branches:
  refs/heads/trunk a593db6a2 -> c35c47981


KAFKA-5717; InMemoryKeyValueStore should delete keys with null values during 
restore

Fixed a bug in the InMemoryKeyValueStore restoration where a key with a `null` 
value is written in to the map rather than being deleted.

Author: Damian Guy <damian....@gmail.com>

Reviewers: Bill Bejeck <bbej...@gmail.com>, Guozhang Wang <wangg...@gmail.com>

Closes #3650 from dguy/kafka-5717


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c35c4798
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c35c4798
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c35c4798

Branch: refs/heads/trunk
Commit: c35c4798139bc30e3a380311e45a22ba56fcc918
Parents: a593db6
Author: Damian Guy <damian....@gmail.com>
Authored: Wed Aug 9 20:03:28 2017 +0100
Committer: Damian Guy <damian....@gmail.com>
Committed: Wed Aug 9 20:03:28 2017 +0100

----------------------------------------------------------------------
 .../state/internals/InMemoryKeyValueStore.java  |  4 ++--
 .../internals/InMemoryKeyValueStoreTest.java    | 24 ++++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c35c4798/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 41c6de3..7e24969 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -75,9 +75,9 @@ public class InMemoryKeyValueStore<K, V> implements 
KeyValueStore<K, V> {
             context.register(root, true, new StateRestoreCallback() {
                 @Override
                 public void restore(byte[] key, byte[] value) {
-                    // check value for null, to avoid  deserialization error.
+                    // this is a delete
                     if (value == null) {
-                        put(serdes.keyFrom(key), null);
+                        delete(serdes.keyFrom(key));
                     } else {
                         put(serdes.keyFrom(key), serdes.valueFrom(value));
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c35c4798/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 222ec71..541c003 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -20,6 +20,11 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 
 public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
@@ -42,4 +47,23 @@ public class InMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         store.init(context, store);
         return store;
     }
+
+    @Test
+    public void shouldRemoveKeysWithNullValues() {
+        store.close();
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(0, "zero");
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(3, "three");
+        driver.addEntryToRestoreLog(0, null);
+
+        store = createKeyValueStore(driver.context(), Integer.class, 
String.class, true);
+        context.restore(store.name(), driver.restoredEntries());
+
+        assertEquals(3, driver.sizeOf(store));
+
+        assertThat(store.get(0), nullValue());
+    }
 }

Reply via email to