This is an automated email from the ASF dual-hosted git repository. agura pushed a commit to branch ignite-14389 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit d15ac8f419ef1ea90a22414930f81b4a11c26456 Author: Andrey Gura <[email protected]> AuthorDate: Fri Apr 23 02:35:28 2021 +0300 IGNITE-14389 Added putAll and removeAll. Started cursor management: ranges and watches (WIP) --- .../metastorage/server/KeyValueStorage.java | 10 + .../server/SimpleInMemoryKeyValueStorage.java | 186 +++++++++- .../server/SimpleInMemoryKeyValueStorageTest.java | 380 +++++++++++++++++++++ 3 files changed, 565 insertions(+), 11 deletions(-) diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index 0f18ece..526e4fb 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -31,11 +31,21 @@ public interface KeyValueStorage { void putAll(List<byte[]> keys, List<byte[]> values); + @NotNull + Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values); + void remove(byte[] key); @NotNull Entry getAndRemove(byte[] key); + void removeAll(List<byte[]> key); + + @NotNull + Collection<Entry> getAndRemoveAll(List<byte[]> keys); + + Iterator<Entry> range(byte[] keyFrom, byte[] keyTo); + Iterator<Entry> iterate(byte[] key); //Iterator<Entry> iterate(long rev); diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index f532005..32f720e 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -1,15 +1,9 @@ package org.apache.ignite.internal.metastorage.server; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NoSuchElementException; -import java.util.TreeMap; +import java.util.*; +import java.util.function.Consumer; + +import org.apache.ignite.metastorage.common.Cursor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.TestOnly; @@ -25,6 +19,8 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { private final Watcher watcher; + private final List<Cursor<Entry>> rangeCursors = new ArrayList<>(); + private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR); private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>(); @@ -66,8 +62,25 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { @Override public void putAll(List<byte[]> keys, List<byte[]> values) { synchronized (mux) { + long curRev = rev + 1; + + doPutAll(curRev, keys, values); + } + } + + @Override + public @NotNull Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) { + Collection<Entry> res; + + synchronized (mux) { + long curRev = rev + 1; + res = doGetAll(keys, curRev); + + doPutAll(curRev, keys, values); } + + return res; } @NotNull @@ -119,6 +132,69 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { } } + @Override + public void removeAll(List<byte[]> keys) { + synchronized (mux) { + long curRev = rev + 1; + + List<byte[]> existingKeys = new ArrayList<>(keys.size()); + + List<byte[]> vals = new ArrayList<>(keys.size()); + + for (int i = 0; i < keys.size(); i++) { + byte[] key = keys.get(i); + + Entry e = doGet(key, LATEST_REV, false); + + if (e.empty() || e.tombstone()) + continue; + + existingKeys.add(key); + + vals.add(TOMBSTONE); + } + + doPutAll(curRev, existingKeys, vals); + } + } + + @Override + public @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys) { + Collection<Entry> res = new ArrayList<>(keys.size()); + + synchronized (mux) { + long curRev = rev + 1; + + List<byte[]> existingKeys = new ArrayList<>(keys.size()); + + List<byte[]> vals = new ArrayList<>(keys.size()); + + for (int i = 0; i < keys.size(); i++) { + byte[] key = keys.get(i); + + Entry e = doGet(key, LATEST_REV, false); + + res.add(e); + + if (e.empty() || e.tombstone()) + continue; + + existingKeys.add(key); + + vals.add(TOMBSTONE); + } + + doPutAll(curRev, existingKeys, vals); + } + + return res; + } + + @Override + public Iterator<Entry> range(byte[] keyFrom, byte[] keyTo) { + return null; + } + @Override public Iterator<Entry> iterate(byte[] keyFrom) { synchronized (mux) { NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true); @@ -237,7 +313,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { private Collection<Entry> doGetAll(List<byte[]> keys, long rev) { assert keys != null : "keys list can't be null."; assert !keys.isEmpty() : "keys list can't be empty."; - assert rev > 0 : "Revision must be positive."; + assert rev > 0 || rev == LATEST_REV: "Revision must be positive."; Collection<Entry> res = new ArrayList<>(keys.size()); @@ -344,6 +420,39 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { return lastRev; } + private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) { + synchronized (mux) { + // Update revsIdx. + NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR); + + for (int i = 0; i < keys.size(); i++) { + byte[] key = keys.get(i); + + byte[] bytes = bytesList.get(i); + + long curUpdCntr = ++updCntr; + + // Update keysIdx. + List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>()); + + long lastRev = revs.isEmpty() ? 0 : lastRevision(revs); + + revs.add(curRev); + + Value val = new Value(bytes, curUpdCntr); + + entries.put(key, val); + + revsIdx.put(curRev, entries); + } + + rev = curRev; + + return curRev; + } + } + + private static boolean isPrefix(byte[] pref, byte[] term) { if (pref.length > term.length) return false; @@ -368,4 +477,59 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { return res; } + private class RangeCursor implements Cursor<Entry> { + private final byte[] keyFrom; + private final byte[] keyTo; + private final long rev; + private byte[] curKey; + + public RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) { + this.keyFrom = keyFrom; + this.keyTo = keyTo; + this.rev = rev; + } + + @Override public void close() throws Exception { + + } + + @NotNull + @Override public Iterator<Entry> iterator() { + return new Iterator<Entry>() { + @Override public boolean hasNext() { + synchronized (mux) { + byte[] key = keysIdx.ceilingKey(curKey); + + return key != null; + } + } + + @Override public Entry next() { + synchronized (mux) { + Map.Entry<byte[], List<Long>> e = keysIdx.ceilingEntry(curKey); + + if (e == null) + throw new NoSuchElementException(); + + List<Long> revs = e.getValue(); + + assert revs != null && !revs.isEmpty() : + "Revisions should not be empty: [revs=" + revs + ']'; + + //lastRevision(re) + + return null; + } + } + }; + } + + @Override public void forEach(Consumer<? super Entry> action) { + Cursor.super.forEach(action); + } + + @Override public Spliterator<Entry> spliterator() { + return Cursor.super.spliterator(); + } + } } diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java index fa130e6..4a73137 100644 --- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java +++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java @@ -272,6 +272,191 @@ class SimpleInMemoryKeyValueStorageTest { } @Test + public void putAll() { + byte[] key1 = k(1); + byte[] val1 = kv(1, 1); + + byte[] key2 = k(2); + byte[] val2_1 = kv(2, 21); + byte[] val2_2 = kv(2, 22); + + byte[] key3 = k(3); + byte[] val3_1 = kv(3, 31); + byte[] val3_2 = kv(3, 32); + + byte[] key4 = k(4); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + // Must be rewritten. + storage.put(key2, val2_1); + + // Remove. Tombstone must be replaced by new value. + storage.put(key3, val3_1); + storage.remove(key3); + + assertEquals(3, storage.revision()); + assertEquals(3, storage.updateCounter()); + + storage.putAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2)); + + assertEquals(4, storage.revision()); + assertEquals(6, storage.updateCounter()); + + Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4)); + + assertEquals(4, entries.size()); + + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + + // Test regular put value. + Entry e1 = map.get(new Key(key1)); + + assertNotNull(e1); + assertEquals(4, e1.revision()); + assertEquals(4, e1.updateCounter()); + assertFalse(e1.tombstone()); + assertFalse(e1.empty()); + assertArrayEquals(val1, e1.value()); + + // Test rewritten value. + Entry e2 = map.get(new Key(key2)); + + assertNotNull(e2); + assertEquals(4, e2.revision()); + assertEquals(5, e2.updateCounter()); + assertFalse(e2.tombstone()); + assertFalse(e2.empty()); + assertArrayEquals(val2_2, e2.value()); + + // Test removed value. + Entry e3 = map.get(new Key(key3)); + + assertNotNull(e3); + assertEquals(4, e3.revision()); + assertEquals(6, e3.updateCounter()); + assertFalse(e3.tombstone()); + assertFalse(e3.empty()); + + // Test empty value. + Entry e4 = map.get(new Key(key4)); + + assertNotNull(e4); + assertFalse(e4.tombstone()); + assertTrue(e4.empty()); + } + + @Test + public void getAndPutAll() { + byte[] key1 = k(1); + byte[] val1 = kv(1, 1); + + byte[] key2 = k(2); + byte[] val2_1 = kv(2, 21); + byte[] val2_2 = kv(2, 22); + + byte[] key3 = k(3); + byte[] val3_1 = kv(3, 31); + byte[] val3_2 = kv(3, 32); + + byte[] key4 = k(4); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + // Must be rewritten. + storage.put(key2, val2_1); + + // Remove. Tombstone must be replaced by new value. + storage.put(key3, val3_1); + storage.remove(key3); + + assertEquals(3, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Collection<Entry> entries = storage.getAndPutAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2)); + + assertEquals(4, storage.revision()); + assertEquals(6, storage.updateCounter()); + + assertEquals(3, entries.size()); + + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + + // Test regular put value. + Entry e1 = map.get(new Key(key1)); + + assertNotNull(e1); + assertEquals(0, e1.revision()); + assertEquals(0, e1.updateCounter()); + assertFalse(e1.tombstone()); + assertTrue(e1.empty()); + + // Test rewritten value. + Entry e2 = map.get(new Key(key2)); + + assertNotNull(e2); + assertEquals(1, e2.revision()); + assertEquals(1, e2.updateCounter()); + assertFalse(e2.tombstone()); + assertFalse(e2.empty()); + assertArrayEquals(val2_1, e2.value()); + + // Test removed value. + Entry e3 = map.get(new Key(key3)); + + assertNotNull(e3); + assertEquals(3, e3.revision()); + assertEquals(3, e3.updateCounter()); + assertTrue(e3.tombstone()); + assertFalse(e3.empty()); + + // Test state after putAll. + entries = storage.getAll(List.of(key1, key2, key3, key4)); + + assertEquals(4, entries.size()); + + map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + + // Test regular put value. + e1 = map.get(new Key(key1)); + + assertNotNull(e1); + assertEquals(4, e1.revision()); + assertEquals(4, e1.updateCounter()); + assertFalse(e1.tombstone()); + assertFalse(e1.empty()); + assertArrayEquals(val1, e1.value()); + + // Test rewritten value. + e2 = map.get(new Key(key2)); + + assertNotNull(e2); + assertEquals(4, e2.revision()); + assertEquals(5, e2.updateCounter()); + assertFalse(e2.tombstone()); + assertFalse(e2.empty()); + assertArrayEquals(val2_2, e2.value()); + + // Test removed value. + e3 = map.get(new Key(key3)); + + assertNotNull(e3); + assertEquals(4, e3.revision()); + assertEquals(6, e3.updateCounter()); + assertFalse(e3.tombstone()); + assertFalse(e3.empty()); + + // Test empty value. + Entry e4 = map.get(new Key(key4)); + + assertNotNull(e4); + assertFalse(e4.tombstone()); + assertTrue(e4.empty()); + } + + @Test public void remove() { byte[] key = k(1); byte[] val = kv(1, 1); @@ -377,6 +562,201 @@ class SimpleInMemoryKeyValueStorageTest { } @Test + public void removeAll() { + byte[] key1 = k(1); + byte[] val1 = kv(1, 1); + + byte[] key2 = k(2); + byte[] val2_1 = kv(2, 21); + byte[] val2_2 = kv(2, 22); + + byte[] key3 = k(3); + byte[] val3_1 = kv(3, 31); + + byte[] key4 = k(4); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + // Regular put. + storage.put(key1, val1); + + // Rewrite. + storage.put(key2, val2_1); + storage.put(key2, val2_2); + + // Remove. Tombstone must not be removed again. + storage.put(key3, val3_1); + storage.remove(key3); + + assertEquals(5, storage.revision()); + assertEquals(5, storage.updateCounter()); + + storage.removeAll(List.of(key1, key2, key3, key4)); + + assertEquals(6, storage.revision()); + assertEquals(7, storage.updateCounter()); // Only two keys are updated. + + Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4)); + + assertEquals(4, entries.size()); + + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + + // Test regular put value. + Entry e1 = map.get(new Key(key1)); + + assertNotNull(e1); + assertEquals(6, e1.revision()); + assertEquals(6, e1.updateCounter()); + assertTrue(e1.tombstone()); + assertFalse(e1.empty()); + + // Test rewritten value. + Entry e2 = map.get(new Key(key2)); + + assertNotNull(e2); + assertEquals(6, e2.revision()); + assertEquals(7, e2.updateCounter()); + assertTrue(e2.tombstone()); + assertFalse(e2.empty()); + + // Test removed value. + Entry e3 = map.get(new Key(key3)); + + assertNotNull(e3); + assertEquals(5, e3.revision()); + assertEquals(5, e3.updateCounter()); + assertTrue(e3.tombstone()); + assertFalse(e3.empty()); + + // Test empty value. + Entry e4 = map.get(new Key(key4)); + + assertNotNull(e4); + assertFalse(e4.tombstone()); + assertTrue(e4.empty()); + } + + @Test + public void getAndRemoveAll() { + byte[] key1 = k(1); + byte[] val1 = kv(1, 1); + + byte[] key2 = k(2); + byte[] val2_1 = kv(2, 21); + byte[] val2_2 = kv(2, 22); + + byte[] key3 = k(3); + byte[] val3_1 = kv(3, 31); + + byte[] key4 = k(4); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + // Regular put. + storage.put(key1, val1); + + // Rewrite. + storage.put(key2, val2_1); + storage.put(key2, val2_2); + + // Remove. Tombstone must not be removed again. + storage.put(key3, val3_1); + storage.remove(key3); + + assertEquals(5, storage.revision()); + assertEquals(5, storage.updateCounter()); + + Collection<Entry> entries = storage.getAndRemoveAll(List.of(key1, key2, key3, key4)); + + assertEquals(6, storage.revision()); + assertEquals(7, storage.updateCounter()); // Only two keys are updated. + + assertEquals(4, entries.size()); + + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + + // Test regular put value. + Entry e1 = map.get(new Key(key1)); + + assertNotNull(e1); + assertEquals(1, e1.revision()); + assertEquals(1, e1.updateCounter()); + assertFalse(e1.tombstone()); + assertFalse(e1.empty()); + + + // Test rewritten value. + Entry e2 = map.get(new Key(key2)); + + assertNotNull(e2); + assertEquals(3, e2.revision()); + assertEquals(3, e2.updateCounter()); + assertFalse(e2.tombstone()); + assertFalse(e2.empty()); + + + // Test removed value. + Entry e3 = map.get(new Key(key3)); + + assertNotNull(e3); + assertEquals(5, e3.revision()); + assertEquals(5, e3.updateCounter()); + assertTrue(e3.tombstone()); + assertFalse(e3.empty()); + + // Test empty value. + Entry e4 = map.get(new Key(key4)); + + assertNotNull(e4); + assertFalse(e4.tombstone()); + assertTrue(e4.empty()); + + // Test state after getAndRemoveAll. + entries = storage.getAll(List.of(key1, key2, key3, key4)); + + assertEquals(4, entries.size()); + + map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + + // Test regular put value. + e1 = map.get(new Key(key1)); + + assertNotNull(e1); + assertEquals(6, e1.revision()); + assertEquals(6, e1.updateCounter()); + assertTrue(e1.tombstone()); + assertFalse(e1.empty()); + + // Test rewritten value. + e2 = map.get(new Key(key2)); + + assertNotNull(e2); + assertEquals(6, e2.revision()); + assertEquals(7, e2.updateCounter()); + assertTrue(e2.tombstone()); + assertFalse(e2.empty()); + + // Test removed value. + e3 = map.get(new Key(key3)); + + assertNotNull(e3); + assertEquals(5, e3.revision()); + assertEquals(5, e3.updateCounter()); + assertTrue(e3.tombstone()); + assertFalse(e3.empty()); + + // Test empty value. + e4 = map.get(new Key(key4)); + + assertNotNull(e4); + assertFalse(e4.tombstone()); + assertTrue(e4.empty()); + } + + @Test public void getAfterRemove() { byte[] key = k(1); byte[] val = kv(1, 1);
