Repository: kafka
Updated Branches:
  refs/heads/trunk 017a21c60 -> 8ffb1a1fe


KAFKA-5711: batch restore should handle deletes

Author: Bill Bejeck <b...@confluent.io>

Reviewers: Damian Guy <damian....@gmail.com>, Guozhang Wang <wangg...@gmail.com>

Closes #3644 from bbejeck/KAFKA-5711_bulk_restore_should_handle_deletes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ffb1a1f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ffb1a1f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ffb1a1f

Branch: refs/heads/trunk
Commit: 8ffb1a1fedad4350dd0f0ce0dd52107f962ce428
Parents: 017a21c
Author: Bill Bejeck <b...@confluent.io>
Authored: Thu Aug 10 11:31:55 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Thu Aug 10 11:31:55 2017 -0700

----------------------------------------------------------------------
 .../streams/state/internals/RocksDBStore.java   |  8 +-
 .../state/internals/RocksDBStoreTest.java       | 89 ++++++++++++++++++++
 2 files changed, 95 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8ffb1a1f/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index fbaeef2..f8e9002 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -283,7 +283,11 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
     private void restoreAllInternal(Collection<KeyValue<byte[], byte[]>> 
records) {
         try (WriteBatch batch = new WriteBatch()) {
             for (KeyValue<byte[], byte[]> record : records) {
-                batch.put(record.key, record.value);
+                if (record.value == null) {
+                    batch.remove(record.key);
+                } else {
+                    batch.put(record.key, record.value);
+                }
             }
             db.write(wOptions, batch);
         } catch (RocksDBException e) {
@@ -316,7 +320,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
                 Objects.requireNonNull(entry.key, "key cannot be null");
                 final byte[] rawKey = serdes.rawKey(entry.key);
                 if (entry.value == null) {
-                    db.delete(rawKey);
+                    batch.remove(rawKey);
                 } else {
                     final byte[] value = serdes.rawValue(entry.value);
                     batch.put(rawKey, value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ffb1a1f/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 831febd..553a134 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -39,11 +40,15 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -149,6 +154,90 @@ public class RocksDBStoreTest {
     }
 
 
+    @Test
+    public void shouldHandleDeletesOnRestoreAll() throws Exception {
+        final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), 
"a".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("2".getBytes("UTF-8"), 
"b".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("3".getBytes("UTF-8"), 
"c".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
+
+        subject.init(context, subject);
+        context.restore(subject.name(), entries);
+
+        final KeyValueIterator<String, String> iterator = subject.all();
+        final Set<String> keys = new HashSet<>();
+
+        while (iterator.hasNext()) {
+            keys.add(iterator.next().key);
+        }
+
+        assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+    }
+
+    @Test
+    public void shouldHandleDeletesAndPutbackOnRestoreAll() throws Exception {
+        final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), 
"a".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("2".getBytes("UTF-8"), 
"b".getBytes("UTF-8")));
+        // this will be deleted
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
+        entries.add(new KeyValue<>("3".getBytes("UTF-8"), 
"c".getBytes("UTF-8")));
+        // this will restore key "1" as WriteBatch applies updates in order
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), 
"restored".getBytes("UTF-8")));
+
+        subject.init(context, subject);
+        context.restore(subject.name(), entries);
+
+        final KeyValueIterator<String, String> iterator = subject.all();
+        final Set<String> keys = new HashSet<>();
+
+        while (iterator.hasNext()) {
+            keys.add(iterator.next().key);
+        }
+
+        assertThat(keys, equalTo(Utils.mkSet("1", "2", "3")));
+
+        assertEquals(subject.get("1"), "restored");
+        assertEquals(subject.get("2"), "b");
+        assertEquals(subject.get("3"), "c");
+    }
+
+    @Test
+    public void shouldRestoreThenDeleteOnRestoreAll() throws Exception {
+
+        final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), 
"a".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("2".getBytes("UTF-8"), 
"b".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("3".getBytes("UTF-8"), 
"c".getBytes("UTF-8")));
+
+        subject.init(context, subject);
+        
+        context.restore(subject.name(), entries);
+
+        assertEquals(subject.get("1"), "a");
+        assertEquals(subject.get("2"), "b");
+        assertEquals(subject.get("3"), "c");
+
+        entries.clear();
+
+        entries.add(new KeyValue<>("2".getBytes("UTF-8"), 
"b".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("3".getBytes("UTF-8"), 
"c".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
+
+        context.restore(subject.name(), entries);
+
+        final KeyValueIterator<String, String> iterator = subject.all();
+        final Set<String> keys = new HashSet<>();
+
+        while (iterator.hasNext()) {
+            keys.add(iterator.next().key);
+        }
+
+        assertThat(keys, equalTo(Utils.mkSet("2", "3")));
+    }
+
+
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullPut() {

Reply via email to