This is an automated email from the ASF dual-hosted git repository. agura pushed a commit to branch ignite-14389 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 5f4ef071849c937792e7afc1ef324e41b47adca1 Author: Andrey Gura <[email protected]> AuthorDate: Tue Mar 30 21:21:07 2021 +0300 IGNITE-14389 Meta storage: in-memory implementation WIP --- modules/metastorage-server/pom.xml | 60 +++++ .../ignite/internal/metastorage/server/Entry.java | 146 +++++++++++ .../metastorage/server/KeyValueStorage.java | 28 +++ .../server/SimpleInMemoryKeyValueStorage.java | 267 ++++++++++++++++++++ .../ignite/internal/metastorage/server/Watch.java | 45 ++++ .../internal/metastorage/server/Watcher.java | 13 + .../internal/metastorage/server/WatcherImpl.java | 58 +++++ .../server/SimpleInMemoryKeyValueStorageTest.java | 274 +++++++++++++++++++++ pom.xml | 1 + 9 files changed, 892 insertions(+) diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml new file mode 100644 index 0000000..3c51fc5 --- /dev/null +++ b/modules/metastorage-server/pom.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent/pom.xml</relativePath> + </parent> + + <artifactId>metastorage-server</artifactId> + <version>3.0.0-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.jetbrains</groupId> + <artifactId>annotations</artifactId> + </dependency> + + <!-- Test dependencies. --> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java new file mode 100644 index 0000000..442aef9 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java @@ -0,0 +1,146 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Represents a storage unit as entry with key, value and revision, where + * <ul> + * <li>key - an unique entry's key represented by an array of bytes. Keys are comparable in lexicographic manner.</li> + * <ul>value - a data which is associated with a key and represented as an array of bytes.</ul> + * <ul>revision - a number which denotes a version of whole meta storage. Each change increments the revision.</ul> + * </ul> + * + * Instance of {@link #Entry} could represents: + * <ul> + * <li>A regular entry which stores a particular key, a value and a revision number.</li> + * <li>An empty entry which denotes absence a regular entry in the meta storage for a given key. + * A revision is 0 for such kind of entry.</li> + * <li>A tombstone entry which denotes that a regular entry for a given key was removed from storage on some revision.</li> + * </ul> + */ +//TODO: Separate client and server entries. Empty and tombstone for client is the same. +public class Entry { + /** Entry key. Couldn't be {@code null}. */ + @NotNull + final private byte[] key; + + /** + * Entry value. + * <p> + * {@code val == null} only for {@link #empty()} and {@link #tombstone()} entries. + * </p> + */ + @Nullable + final private byte[] val; + + /** + * Revision number corresponding to this particular entry. + * <p> + * {@code rev == 0} for {@link #empty()} entry, + * {@code rev > 0} for regular and {@link #tombstone()} entries. + * </p> + */ + final private long rev; + + /** + * Constructor. + * + * @param key Key bytes. Couldn't be {@code null}. + * @param val Value bytes. Couldn't be {@code null}. + * @param rev Revision. + */ + // TODO: It seems user will never create Entry, so we can reduce constructor scope to protected or package-private and reuse it from two-place private constructor. + public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev) { + assert key != null : "key can't be null"; + assert val != null : "value can't be null"; + + this.key = key; + this.val = val; + this.rev = rev; + } + + /** + * Constructor for empty and tombstone entries. + * + * @param key Key bytes. Couldn't be {@code null}. + * @param rev Revision. + */ + private Entry(@NotNull byte[] key, long rev) { + assert key != null : "key can't be null"; + + this.key = key; + this.val = null; + this.rev = rev; + } + + /** + * Creates an instance of empty entry for a given key. + * + * @param key Key bytes. Couldn't be {@code null}. + * @return Empty entry. + */ + @NotNull + public static Entry empty(byte[] key) { + return new Entry(key, 0); + } + + /** + * Creates an instance of tombstone entry for a given key and a revision. + * + * @param key Key bytes. Couldn't be {@code null}. + * @return Empty entry. + */ + @NotNull + public static Entry tombstone(byte[] key, long rev) { + assert rev > 0 : "rev must be positive for tombstone entry."; + + return new Entry(key, rev); + } + + /** + * Returns a key. + * + * @return Key. + */ + @NotNull + public byte[] key() { + return key; + } + + /** + * Returns a value. + * + * @return Value. + */ + @Nullable + public byte[] value() { + return val; + } + + /** + * Returns a revision. + * @return Revision. + */ + public long revision() { + return rev; + } + + /** + * Returns value which denotes whether entry is tombstone or not. + * + * @return {@code True} if entry is tombstone, otherwise - {@code false}. + */ + public boolean tombstone() { + return val == null && rev > 0; + } + + /** + * Returns value which denotes whether entry is empty or not. + * + * @return {@code True} if entry is empty, otherwise - {@code false}. + */ + public boolean empty() { + return val == null && rev == 0; + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java new file mode 100644 index 0000000..1bf6b78 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -0,0 +1,28 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.jetbrains.annotations.NotNull; + +import java.util.Iterator; + +public interface KeyValueStorage { + + long revision(); + + @NotNull + Entry put(byte[] key, byte[] value); + + @NotNull + Entry get(byte[] key); + + @NotNull + Entry get(byte[] key, long rev); + + @NotNull + Entry remove(byte[] key); + + Iterator<Entry> iterate(byte[] key); + + //Iterator<Entry> iterate(long rev); + + void compact(); +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java new file mode 100644 index 0000000..9059aec --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -0,0 +1,267 @@ +package org.apache.ignite.internal.metastorage.server; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NoSuchElementException; +import java.util.TreeMap; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.TestOnly; + +/** + * WARNING: Only for test purposes and only for non-distributed (one static instance) storage. + */ +public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { + private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare; + + private static final byte[] TOMBSTONE = new byte[0]; + + private static final long LATEST_REV = -1; + + private final Watcher watcher; + + private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR); + + private NavigableMap<Long, NavigableMap<byte[], byte[]>> revsIdx = new TreeMap<>(); + + private long grev = 0; + + private final Object mux = new Object(); + + public SimpleInMemoryKeyValueStorage(Watcher watcher) { + this.watcher = watcher; + } + + @Override public long revision() { + return grev; + } + + @NotNull + @Override public Entry put(byte[] key, byte[] val) { + synchronized (mux) { + long crev = ++grev; + + // Update keysIdx. + List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>()); + + long lrev = revs.isEmpty() ? 0 : lastRevision(revs); + + revs.add(crev); + + // Update revsIdx. + NavigableMap<byte[], byte[]> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR); + + entries.put(key, val); + + revsIdx.put(crev, entries); + + // Return previous value. + if (lrev == 0) + return Entry.empty(key); + + NavigableMap<byte[], byte[]> lastVal = revsIdx.get(lrev); + + Entry res = new Entry(key, lastVal.get(key), lrev); + + //TODO: notify watchers + + return res; + } + } + + @NotNull + @Override public Entry get(byte[] key) { + synchronized (mux) { + return doGet(key, LATEST_REV); + } + } + + @NotNull + @TestOnly + @Override public Entry get(byte[] key, long rev) { + synchronized (mux) { + return doGet(key, rev); + } + } + + @NotNull + @Override public Entry remove(byte[] key) { + synchronized (mux) { + Entry e = doGet(key, LATEST_REV); + + if (e.value() == null) + return e; + + return put(key, TOMBSTONE); + } + } + + @Override public Iterator<Entry> iterate(byte[] keyFrom) { + synchronized (mux) { + NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true); + + final Iterator<Map.Entry<byte[], List<Long>>> it = tailMap.entrySet().iterator(); + + return new Iterator<>() { + private Map.Entry<byte[], List<Long>> curr; + private boolean hasNext; + + private void advance() { + if (it.hasNext()) { + Map.Entry<byte[], List<Long>> e = it.next(); + + byte[] key = e.getKey(); + + if (!isPrefix(keyFrom, key)) + hasNext = false; + else { + curr = e; + + hasNext = true; + } + } else + hasNext = false; + } + + @Override + public boolean hasNext() { + synchronized (mux) { + if (curr == null) + advance(); + + return hasNext; + } + } + + @Override + public Entry next() { + synchronized (mux) { + if (!hasNext()) + throw new NoSuchElementException(); + + Map.Entry<byte[], List<Long>> e = curr; + + curr = null; + + byte[] key = e.getKey(); + + List<Long> revs = e.getValue(); + + long rev = revs == null || revs.isEmpty() ? 0 : lastRevision(revs); + + if (rev == 0) { + throw new IllegalStateException("rev == 0"); + //return new AbstractMap.SimpleImmutableEntry<>(key, null); + } + + NavigableMap<byte[], byte[]> vals = revsIdx.get(rev); + + if (vals == null || vals.isEmpty()) { + throw new IllegalStateException("vals == null || vals.isEmpty()"); + //return new AbstractMap.SimpleImmutableEntry<>(key, null); + } + + byte[] val = vals.get(key); + + return val == TOMBSTONE ? Entry.tombstone(key, rev) : new Entry(key, val, rev); + } + } + }; + } + } + + @Override public void compact() { + synchronized (mux) { + NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR); + + NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx = new TreeMap<>(); + + keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx)); + + keysIdx = compactedKeysIdx; + + revsIdx = compactedRevsIdx; + } + } + + private void compactForKey( + byte[] key, + List<Long> revs, + NavigableMap<byte[], List<Long>> compactedKeysIdx, + NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx + ) { + Long lrev = lastRevision(revs); + + NavigableMap<byte[], byte[]> kv = revsIdx.get(lrev); + + byte[] lastVal = kv.get(key); + + if (lastVal != TOMBSTONE) { + compactedKeysIdx.put(key, listOf(lrev)); + + NavigableMap<byte[], byte[]> compactedKv = compactedRevsIdx.computeIfAbsent( + lrev, + k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR) + ); + + compactedKv.put(key, lastVal); + } + } + + /** + * Returns entry for given key. + * + * @param key Key. + * @param rev Revision. + * @return Entry for given key. + */ + @NotNull private Entry doGet(byte[] key, long rev) { + List<Long> revs = keysIdx.get(key); + + if (revs == null || revs.isEmpty()) + return Entry.empty(key); + + long lrev = rev == LATEST_REV ? lastRevision(revs) : rev; + + NavigableMap<byte[], byte[]> entries = revsIdx.get(lrev); + + if (entries == null || entries.isEmpty()) + return Entry.empty(key); + + byte[] val = entries.get(key); + + if (val == TOMBSTONE) + return Entry.tombstone(key, lrev); + + return new Entry(key, val , lrev); + } + + private static boolean isPrefix(byte[] pref, byte[] term) { + if (pref.length > term.length) + return false; + + for (int i = 0; i < pref.length - 1; i++) { + if (pref[i] != term[i]) + return false; + } + + return true; + } + + private static long lastRevision(List<Long> revs) { + return revs.get(revs.size() - 1); + } + + private static List<Long> listOf(long val) { + List<Long> res = new ArrayList<>(); + + res.add(val); + + return res; + } + +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java new file mode 100644 index 0000000..26cfa5c --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java @@ -0,0 +1,45 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.jetbrains.annotations.Nullable; + +import java.util.Arrays; +import java.util.Comparator; + +public class Watch { + private static final Comparator<byte[]> CMP = Arrays::compare; + + private static final long ANY_REVISION = -1; + + @Nullable + private byte[] startKey; + + @Nullable + private byte[] endKey; + + long rev = ANY_REVISION; + + public void startKey(byte[] startKey) { + this.startKey = startKey; + } + + public void endKey(byte[] endKey) { + this.endKey = endKey; + } + + public void revision(long rev) { + this.rev = rev; + } + + public void notify(Entry e) { + if (startKey != null && CMP.compare(e.key(), startKey) < 0) + return; + + if (endKey != null && CMP.compare(e.key(), endKey) > 0) + return; + + if (rev != ANY_REVISION && e.revision() <= rev) + return; + + System.out.println("Entry: key=" + new String(e.key()) + ", value=" + new String(e.value()) + ", rev=" + e.revision()); + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java new file mode 100644 index 0000000..5516d06 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java @@ -0,0 +1,13 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.jetbrains.annotations.NotNull; + +public interface Watcher { + void register(@NotNull Watch watch); + + void notify(@NotNull Entry e); + + //TODO: implement + void cancel(@NotNull Watch watch); +} + diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java new file mode 100644 index 0000000..dc126a0 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java @@ -0,0 +1,58 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.jetbrains.annotations.NotNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class WatcherImpl implements Watcher { + private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>(); + + private final List<Watch> watches = new ArrayList<>(); + + private volatile boolean stop; + + private final Object mux = new Object(); + + @Override public void register(@NotNull Watch watch) { + synchronized (mux) { + watches.add(watch); + } + } + + @Override public void notify(@NotNull Entry e) { + queue.offer(e); + } + + @Override + public void cancel(@NotNull Watch watch) { + throw new UnsupportedOperationException("Not implemented yet."); + } + + public void shutdown() { + stop = true; + } + + private class WatcherWorker implements Runnable { + @Override public void run() { + while (!stop) { + try { + Entry e = queue.poll(100, TimeUnit.MILLISECONDS); + + if (e != null) { + synchronized (mux) { + watches.forEach(w -> w.notify(e)); + } + } + } + catch (InterruptedException interruptedException) { + // No-op. + } + } + } + } +} + diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java new file mode 100644 index 0000000..f7fb17e --- /dev/null +++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java @@ -0,0 +1,274 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class SimpleInMemoryKeyValueStorageTest { + private KeyValueStorage storage; + + @BeforeEach + public void setUp() { + storage = new SimpleInMemoryKeyValueStorage(new NoOpWatcher()); + } + + @Test + void putGetRemoveCompact() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 1); + byte[] val1_3 = kv(1, 3); + + byte[] key2 = k(2); + byte[] val2_2 = kv(2, 2); + + assertEquals(0, storage.revision()); + + // Previous entry is empty. + Entry emptyEntry = storage.put(key1, val1_1); + + assertEquals(1, storage.revision()); + assertTrue(emptyEntry.empty()); + + // Entry with rev == 1. + Entry e1_1 = storage.get(key1); + + assertFalse(e1_1.empty()); + assertFalse(e1_1.tombstone()); + assertArrayEquals(key1, e1_1.key()); + assertArrayEquals(val1_1, e1_1.value()); + assertEquals(1, e1_1.revision()); + assertEquals(1, storage.revision()); + + // Previous entry is empty. + emptyEntry = storage.put(key2, val2_2); + + assertEquals(2, storage.revision()); + assertTrue(emptyEntry.empty()); + + // Entry with rev == 2. + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertArrayEquals(key2, e2.key()); + assertArrayEquals(val2_2, e2.value()); + assertEquals(2, e2.revision()); + assertEquals(2, storage.revision()); + + // Previous entry is not empty. + e1_1 = storage.put(key1, val1_3); + + assertFalse(e1_1.empty()); + assertFalse(e1_1.tombstone()); + assertArrayEquals(key1, e1_1.key()); + assertArrayEquals(val1_1, e1_1.value()); + assertEquals(1, e1_1.revision()); + assertEquals(3, storage.revision()); + + // Entry with rev == 3. + Entry e1_3 = storage.get(key1); + + assertFalse(e1_3.empty()); + assertFalse(e1_3.tombstone()); + assertArrayEquals(key1, e1_3.key()); + assertArrayEquals(val1_3, e1_3.value()); + assertEquals(3, e1_3.revision()); + assertEquals(3, storage.revision()); + + // Remove existing entry. + Entry e2_2 = storage.remove(key2); + + assertFalse(e2_2.empty()); + assertFalse(e2_2.tombstone()); + assertArrayEquals(key2, e2_2.key()); + assertArrayEquals(val2_2, e2_2.value()); + assertEquals(2, e2_2.revision()); + assertEquals(4, storage.revision()); // Storage revision is changed. + + // Remove already removed entry. + Entry tombstoneEntry = storage.remove(key2); + + assertFalse(tombstoneEntry.empty()); + assertTrue(tombstoneEntry.tombstone()); + assertEquals(4, storage.revision()); // Storage revision is not changed. + + // Compact and check that tombstones are removed. + storage.compact(); + + assertEquals(4, storage.revision()); + assertTrue(storage.remove(key2).empty()); + assertTrue(storage.get(key2).empty()); + + // Remove existing entry. + e1_3 = storage.remove(key1); + + assertFalse(e1_3.empty()); + assertFalse(e1_3.tombstone()); + assertArrayEquals(key1, e1_3.key()); + assertArrayEquals(val1_3, e1_3.value()); + assertEquals(3, e1_3.revision()); + assertEquals(5, storage.revision()); // Storage revision is changed. + + // Remove already removed entry. + tombstoneEntry = storage.remove(key1); + + assertFalse(tombstoneEntry.empty()); + assertTrue(tombstoneEntry.tombstone()); + assertEquals(5, storage.revision()); // // Storage revision is not changed. + + // Compact and check that tombstones are removed. + storage.compact(); + + assertEquals(5, storage.revision()); + assertTrue(storage.remove(key1).empty()); + assertTrue(storage.get(key1).empty()); + } + + @Test + void compact() { + assertEquals(0, storage.revision()); + + // Compact empty. + storage.compact(); + + assertEquals(0, storage.revision()); + + // Compact non-empty. + fill(storage, 1, 1); + + assertEquals(1, storage.revision()); + + fill(storage, 2, 2); + + assertEquals(3, storage.revision()); + + fill(storage, 3, 3); + + assertEquals(6, storage.revision()); + + storage.remove(k(3)); + + assertEquals(7, storage.revision()); + assertTrue(storage.get(k(3)).tombstone()); + + storage.compact(); + + assertEquals(7, storage.revision()); + + Entry e1 = storage.get(k(1)); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertArrayEquals(k(1), e1.key()); + assertArrayEquals(kv(1,1), e1.value()); + assertEquals(1, e1.revision()); + + Entry e2 = storage.get(k(2)); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertArrayEquals(k(2), e2.key()); + assertArrayEquals(kv(2,2), e2.value()); + assertTrue(storage.get(k(2), 2).empty()); + assertEquals(3, e2.revision()); + + Entry e3 = storage.get(k(3)); + + assertTrue(e3.empty()); + assertTrue(storage.get(k(3), 5).empty()); + assertTrue(storage.get(k(3), 6).empty()); + assertTrue(storage.get(k(3), 7).empty()); + } + + @Test + void iterate() { + TreeMap<String, String> expFooMap = new TreeMap<>(); + TreeMap<String, String> expKeyMap = new TreeMap<>(); + TreeMap<String, String> expZooMap = new TreeMap<>(); + + fill("foo", storage, expFooMap); + fill("key", storage, expKeyMap); + fill("zoo", storage, expZooMap); + + assertEquals(300, storage.revision()); + + assertIterate("key", storage, expKeyMap); + assertIterate("zoo", storage, expZooMap); + assertIterate("foo", storage, expFooMap); + } + + private void assertIterate(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) { + Iterator<Entry> it = storage.iterate((pref + "_").getBytes()); + Iterator<Map.Entry<String, String>> expIt = expMap.entrySet().iterator(); + + // Order. + while (it.hasNext()) { + Entry entry = it.next(); + Map.Entry<String, String> expEntry = expIt.next(); + + assertEquals(expEntry.getKey(), new String(entry.key())); + assertEquals(expEntry.getValue(), new String(entry.value())); + } + + // Range boundaries. + it = storage.iterate((pref + '_').getBytes()); + + while (it.hasNext()) { + Entry entry = it.next(); + + assertTrue(expMap.containsKey(new String(entry.key()))); + } + } + + private static void fill(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) { + for (int i = 0; i < 100; i++) { + String keyStr = pref + '_' + i; + + String valStr = "val_" + i; + + expMap.put(keyStr, valStr); + + byte[] key = keyStr.getBytes(); + + byte[] val = valStr.getBytes(); + + storage.put(key, val); + } + } + + private static void fill(KeyValueStorage storage, int keySuffix, int num) { + for (int i = 0; i < num; i++) + storage.put(k(keySuffix), kv(keySuffix, i + 1)); + } + + private static byte[] k(int k) { + return ("key" + k).getBytes(); + } + + private static byte[] kv(int k, int v) { + return ("key" + k + '_' + "val" + v).getBytes(); + } + + private static class NoOpWatcher implements Watcher { + @Override public void register(@NotNull Watch watch) { + // No-op. + } + + @Override public void notify(@NotNull Entry e) { + // No-op. + } + + @Override public void cancel(@NotNull Watch watch) { + // No-op. + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 0f89644..3e7da2e 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,7 @@ <module>modules/metastorage</module> <module>modules/metastorage-client</module> <module>modules/metastorage-common</module> + <module>modules/metastorage-server</module> <module>modules/network</module> <module>modules/raft</module> <module>modules/raft-client</module>
