This is an automated email from the ASF dual-hosted git repository. agura pushed a commit to branch ignite-14389 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit bab8272cb4d361cc9e925861529edc7a06d2d761 Author: Andrey Gura <[email protected]> AuthorDate: Thu Apr 29 02:44:00 2021 +0300 IGNITE-14389 Implemented cursor for ranges and watches. --- .../ignite/internal/metastorage/server/Entry.java | 17 + .../internal/metastorage/server/EntryEvent.java | 58 +++ .../metastorage/server/KeyValueStorage.java | 33 +- .../server/SimpleInMemoryKeyValueStorage.java | 333 +++++++++------ .../ignite/internal/metastorage/server/Value.java | 17 + .../ignite/internal/metastorage/server/Watch.java | 45 --- .../internal/metastorage/server/WatchEvent.java | 54 +++ .../internal/metastorage/server/Watcher.java | 13 - .../internal/metastorage/server/WatcherImpl.java | 58 --- .../server/SimpleInMemoryKeyValueStorageTest.java | 446 ++++++++++++++++++--- 10 files changed, 764 insertions(+), 310 deletions(-) diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java index 263a88b..87b5471 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.metastorage.server; import org.jetbrains.annotations.NotNull; diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java new file mode 100644 index 0000000..554a3a7 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server; + +/** + * Represent an change event for particular key and entry. + */ +public class EntryEvent { + /** Old (previous) entry. */ + private final Entry oldEntry; + + /** New (current) entry. */ + private final Entry entry; + + /** + * Constructs event with given old and new entries. + * + * @param oldEntry Old entry. + * @param curEntry New entry. + */ + EntryEvent(Entry oldEntry, Entry curEntry) { + this.oldEntry = oldEntry; + this.entry = curEntry; + } + + /** + * Returns old entry. + * + * @return Old entry. + */ + public Entry oldEntry() { + return oldEntry; + } + + /** + * Rreturns new entry. + * + * @return New entry. + */ + public Entry entry() { + return entry; + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index 526e4fb..5d6da44 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -1,13 +1,28 @@ -package org.apache.ignite.internal.metastorage.server; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import org.jetbrains.annotations.NotNull; +package org.apache.ignite.internal.metastorage.server; import java.util.Collection; -import java.util.Iterator; import java.util.List; +import org.apache.ignite.metastorage.common.Cursor; +import org.jetbrains.annotations.NotNull; public interface KeyValueStorage { - long revision(); long updateCounter(); @@ -44,11 +59,15 @@ public interface KeyValueStorage { @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys); - Iterator<Entry> range(byte[] keyFrom, byte[] keyTo); + Cursor<Entry> range(byte[] keyFrom, byte[] keyTo); + + Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound); + + Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev); - Iterator<Entry> iterate(byte[] key); + Cursor<WatchEvent> watch(byte[] key, long rev); - //Iterator<Entry> iterate(long rev); + Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev); void compact(); } diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index 32f720e..b37c96a 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -1,8 +1,34 @@ -package org.apache.ignite.internal.metastorage.server; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import java.util.*; -import java.util.function.Consumer; +package org.apache.ignite.internal.metastorage.server; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NoSuchElementException; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.function.Predicate; import org.apache.ignite.metastorage.common.Cursor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.TestOnly; @@ -13,15 +39,11 @@ import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; * WARNING: Only for test purposes and only for non-distributed (one static instance) storage. */ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { - private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare; + private static final Comparator<byte[]> CMP = Arrays::compare; private static final long LATEST_REV = -1; - private final Watcher watcher; - - private final List<Cursor<Entry>> rangeCursors = new ArrayList<>(); - - private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR); + private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP); private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>(); @@ -31,10 +53,6 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { private final Object mux = new Object(); - public SimpleInMemoryKeyValueStorage(Watcher watcher) { - this.watcher = watcher; - } - @Override public long revision() { return rev; } @@ -141,9 +159,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { List<byte[]> vals = new ArrayList<>(keys.size()); - for (int i = 0; i < keys.size(); i++) { - byte[] key = keys.get(i); - + for (byte[] key : keys) { Entry e = doGet(key, LATEST_REV, false); if (e.empty() || e.tombstone()) @@ -169,9 +185,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { List<byte[]> vals = new ArrayList<>(keys.size()); - for (int i = 0; i < keys.size(); i++) { - byte[] key = keys.get(i); - + for (byte[] key : keys) { Entry e = doGet(key, LATEST_REV, false); res.add(e); @@ -190,90 +204,45 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { return res; } - @Override - public Iterator<Entry> range(byte[] keyFrom, byte[] keyTo) { - return null; + @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) { + return new RangeCursor(keyFrom, keyTo, rev); } - @Override public Iterator<Entry> iterate(byte[] keyFrom) { - synchronized (mux) { - NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true); - - final Iterator<Map.Entry<byte[], List<Long>>> it = tailMap.entrySet().iterator(); - - return new Iterator<>() { - private Map.Entry<byte[], List<Long>> curr; - private boolean hasNext; - - private void advance() { - if (it.hasNext()) { - Map.Entry<byte[], List<Long>> e = it.next(); - - byte[] key = e.getKey(); - - if (!isPrefix(keyFrom, key)) - hasNext = false; - else { - curr = e; - - hasNext = true; - } - } else - hasNext = false; - } - - @Override - public boolean hasNext() { - synchronized (mux) { - if (curr == null) - advance(); - - return hasNext; - } - } - - @Override - public Entry next() { - synchronized (mux) { - if (!hasNext()) - throw new NoSuchElementException(); - - Map.Entry<byte[], List<Long>> e = curr; - - curr = null; + @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) { + return new RangeCursor(keyFrom, keyTo, revUpperBound); + } - byte[] key = e.getKey(); + @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) { + assert keyFrom != null; + assert rev > 0; - List<Long> revs = e.getValue(); + return new WatchCursor(rev, k -> + CMP.compare(keyFrom, k) <= 0 && (keyTo == null || CMP.compare(k, keyTo) < 0) + ); + } - long rev = revs == null || revs.isEmpty() ? 0 : lastRevision(revs); + @Override public Cursor<WatchEvent> watch(byte[] key, long rev) { + assert key != null; + assert rev > 0; - if (rev == 0) { - throw new IllegalStateException("rev == 0"); - //return new AbstractMap.SimpleImmutableEntry<>(key, null); - } + return new WatchCursor(rev, k -> CMP.compare(k, key) == 0); + } - NavigableMap<byte[], Value> vals = revsIdx.get(rev); + @Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) { + assert keys != null && !keys.isEmpty(); + assert rev > 0; - if (vals == null || vals.isEmpty()) { - throw new IllegalStateException("vals == null || vals.isEmpty()"); - //return new AbstractMap.SimpleImmutableEntry<>(key, null); - } + TreeSet<byte[]> keySet = new TreeSet<>(CMP); - Value val = vals.get(key); + keySet.addAll(keys); - return val.tombstone() ? - Entry.tombstone(key, rev, val.updateCounter()) : - new Entry(key, val.bytes(), rev, val.updateCounter()); - } - } - }; - } + return new WatchCursor(rev, keySet::contains); } + @Override public void compact() { synchronized (mux) { - NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR); + NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(CMP); NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>(); @@ -302,7 +271,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { NavigableMap<byte[], Value> compactedKv = compactedRevsIdx.computeIfAbsent( lastRev, - k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR) + k -> new TreeMap<>(CMP) ); compactedKv.put(key, lastVal); @@ -409,7 +378,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { revs.add(curRev); // Update revsIdx. - NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR); + NavigableMap<byte[], Value> entries = new TreeMap<>(CMP); Value val = new Value(bytes, curUpdCntr); @@ -423,7 +392,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) { synchronized (mux) { // Update revsIdx. - NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR); + NavigableMap<byte[], Value> entries = new TreeMap<>(CMP); for (int i = 0; i < keys.size(); i++) { byte[] key = keys.get(i); @@ -452,19 +421,6 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { } } - - private static boolean isPrefix(byte[] pref, byte[] term) { - if (pref.length > term.length) - return false; - - for (int i = 0; i < pref.length - 1; i++) { - if (pref[i] != term[i]) - return false; - } - - return true; - } - private static long lastRevision(List<Long> revs) { return revs.get(revs.size() - 1); } @@ -481,55 +437,184 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { private final byte[] keyFrom; private final byte[] keyTo; private final long rev; - private byte[] curKey; + private Entry nextRetEntry; + private byte[] lastRetKey; + private boolean finished; - public RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) { + RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) { this.keyFrom = keyFrom; this.keyTo = keyTo; this.rev = rev; } @Override public void close() throws Exception { - + // TODO: implement. } @NotNull @Override public Iterator<Entry> iterator() { - return new Iterator<Entry>() { + return new Iterator<>() { @Override public boolean hasNext() { synchronized (mux) { - byte[] key = keysIdx.ceilingKey(curKey); + while (true) { + if (finished) + return false; + + if (nextRetEntry != null) + return true; - return key != null; + byte[] key = lastRetKey; + + while (!finished || nextRetEntry == null) { + Map.Entry<byte[], List<Long>> e = + key == null ? keysIdx.ceilingEntry(keyFrom) : keysIdx.higherEntry(key); + + if (e == null) { + finished = true; + + break; + } + + key = e.getKey(); + + if (keyTo != null && CMP.compare(key, keyTo) >= 0) { + finished = true; + + break; + } + + List<Long> revs = e.getValue(); + + assert revs != null && !revs.isEmpty() : + "Revisions should not be empty: [revs=" + revs + ']'; + + long lastRev = maxRevision(revs, rev); + + if (lastRev == -1) + continue; + + Entry entry = doGetValue(key, lastRev); + + assert !entry.empty() : "Iterator should not return empty entry."; + + nextRetEntry = entry; + + break; + } + } } } @Override public Entry next() { synchronized (mux) { - Map.Entry<byte[], List<Long>> e = keysIdx.ceilingEntry(curKey); - - if (e == null) - throw new NoSuchElementException(); + while (true) { + if (finished) + throw new NoSuchElementException(); - List<Long> revs = e.getValue(); + if (nextRetEntry != null) { + Entry e = nextRetEntry; - assert revs != null && !revs.isEmpty() : - "Revisions should not be empty: [revs=" + revs + ']'; + nextRetEntry = null; - //lastRevision(re) + lastRetKey = e.key(); - return null; + return e; + } else + hasNext(); + } } } }; } + } - @Override public void forEach(Consumer<? super Entry> action) { - Cursor.super.forEach(action); + private class WatchCursor implements Cursor<WatchEvent> { + private final Predicate<byte[]> p; + private long lastRetRev; + private long nextRetRev = -1; + + WatchCursor(long rev, Predicate<byte[]> p) { + this.p = p; + this.lastRetRev = rev - 1; } - @Override public Spliterator<Entry> spliterator() { - return Cursor.super.spliterator(); + @Override public void close() throws Exception { + // TODO: implement + } + + @NotNull + @Override public Iterator<WatchEvent> iterator() { + return new Iterator<>() { + @Override public boolean hasNext() { + synchronized (mux) { + if (nextRetRev != -1) + return true; + + while (true) { + long curRev = lastRetRev + 1; + + NavigableMap<byte[], Value> entries = revsIdx.get(curRev); + + if (entries == null) + return false; + + for (byte[] key : entries.keySet()) { + if (p.test(key)) { + nextRetRev = curRev; + + return true; + } + } + + lastRetRev++; + } + } + } + + @Override public WatchEvent next() { + synchronized (mux) { + while (true) { + if (nextRetRev != -1) { + NavigableMap<byte[], Value> entries = revsIdx.get(nextRetRev); + + if (entries == null) + return null; + + List<EntryEvent> evts = new ArrayList<>(entries.size()); + + for (Map.Entry<byte[], Value> e : entries.entrySet()) { + byte[] key = e.getKey(); + + Value val = e.getValue(); + + if (p.test(key)) { + Entry newEntry; + + if (val.tombstone()) + newEntry = Entry.tombstone(key, nextRetRev, val.updateCounter()); + else + newEntry = new Entry(key, val.bytes(), nextRetRev, val.updateCounter()); + + Entry oldEntry = doGet(key, nextRetRev - 1, false); + + evts.add(new EntryEvent(oldEntry, newEntry)); + } + } + + if (evts.isEmpty()) + continue; + + lastRetRev = nextRetRev; + + nextRetRev = -1; + + return new WatchEvent(evts); + } else if (!hasNext()) + return null; + } + } + } + }; } } } diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java index 250a5ea..a438fd4 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.metastorage.server; import org.jetbrains.annotations.NotNull; diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java deleted file mode 100644 index 26cfa5c..0000000 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.apache.ignite.internal.metastorage.server; - -import org.jetbrains.annotations.Nullable; - -import java.util.Arrays; -import java.util.Comparator; - -public class Watch { - private static final Comparator<byte[]> CMP = Arrays::compare; - - private static final long ANY_REVISION = -1; - - @Nullable - private byte[] startKey; - - @Nullable - private byte[] endKey; - - long rev = ANY_REVISION; - - public void startKey(byte[] startKey) { - this.startKey = startKey; - } - - public void endKey(byte[] endKey) { - this.endKey = endKey; - } - - public void revision(long rev) { - this.rev = rev; - } - - public void notify(Entry e) { - if (startKey != null && CMP.compare(e.key(), startKey) < 0) - return; - - if (endKey != null && CMP.compare(e.key(), endKey) > 0) - return; - - if (rev != ANY_REVISION && e.revision() <= rev) - return; - - System.out.println("Entry: key=" + new String(e.key()) + ", value=" + new String(e.value()) + ", rev=" + e.revision()); - } -} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java new file mode 100644 index 0000000..561f203 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server; + +import java.util.Collection; +import java.util.List; + +public class WatchEvent { + private final List<EntryEvent> entryEvts; + + private final boolean batch; + + /** + * Constructs an watch event with given entry events collection. + * + * @param entryEvts Events for entries corresponding to an update under one revision. + */ + public WatchEvent(List<EntryEvent> entryEvts) { + assert entryEvts != null && !entryEvts.isEmpty(); + + this.batch = entryEvts.size() > 1; + this.entryEvts = entryEvts; + } + + public boolean batch() { + return batch; + } + + public Collection<EntryEvent> entryEvents() { + return entryEvts; + } + + public EntryEvent entryEvent() { + if (batch) + throw new IllegalStateException("Watch event represents a batch of events."); + + return entryEvts.get(0); + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java deleted file mode 100644 index 5516d06..0000000 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.apache.ignite.internal.metastorage.server; - -import org.jetbrains.annotations.NotNull; - -public interface Watcher { - void register(@NotNull Watch watch); - - void notify(@NotNull Entry e); - - //TODO: implement - void cancel(@NotNull Watch watch); -} - diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java deleted file mode 100644 index dc126a0..0000000 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.apache.ignite.internal.metastorage.server; - -import org.jetbrains.annotations.NotNull; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -public class WatcherImpl implements Watcher { - private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>(); - - private final List<Watch> watches = new ArrayList<>(); - - private volatile boolean stop; - - private final Object mux = new Object(); - - @Override public void register(@NotNull Watch watch) { - synchronized (mux) { - watches.add(watch); - } - } - - @Override public void notify(@NotNull Entry e) { - queue.offer(e); - } - - @Override - public void cancel(@NotNull Watch watch) { - throw new UnsupportedOperationException("Not implemented yet."); - } - - public void shutdown() { - stop = true; - } - - private class WatcherWorker implements Runnable { - @Override public void run() { - while (!stop) { - try { - Entry e = queue.poll(100, TimeUnit.MILLISECONDS); - - if (e != null) { - synchronized (mux) { - watches.forEach(w -> w.notify(e)); - } - } - } - catch (InterruptedException interruptedException) { - // No-op. - } - } - } - } -} - diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java index 4a73137..27df790 100644 --- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java +++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java @@ -1,17 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.metastorage.server; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.TreeMap; -import java.util.function.Function; +import java.util.NoSuchElementException; import java.util.stream.Collectors; +import org.apache.ignite.metastorage.common.Cursor; import org.apache.ignite.metastorage.common.Key; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static java.util.function.Function.identity; import static org.junit.jupiter.api.Assertions.*; class SimpleInMemoryKeyValueStorageTest { @@ -19,7 +36,7 @@ class SimpleInMemoryKeyValueStorageTest { @BeforeEach public void setUp() { - storage = new SimpleInMemoryKeyValueStorage(new NoOpWatcher()); + storage = new SimpleInMemoryKeyValueStorage(); } @Test @@ -91,7 +108,7 @@ class SimpleInMemoryKeyValueStorageTest { assertEquals(4, entries.size()); - Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity())); // Test regular put value. Entry e1 = map.get(new Key(key1)); @@ -166,7 +183,7 @@ class SimpleInMemoryKeyValueStorageTest { assertEquals(4, entries.size()); - Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity())); // Test regular put value. Entry e1 = map.get(new Key(key1)); @@ -204,7 +221,7 @@ class SimpleInMemoryKeyValueStorageTest { assertEquals(4, entries.size()); - map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity())); // Test regular put value. e1 = map.get(new Key(key1)); @@ -308,7 +325,7 @@ class SimpleInMemoryKeyValueStorageTest { assertEquals(4, entries.size()); - Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity())); // Test regular put value. Entry e1 = map.get(new Key(key1)); @@ -382,7 +399,7 @@ class SimpleInMemoryKeyValueStorageTest { assertEquals(3, entries.size()); - Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity())); // Test regular put value. Entry e1 = map.get(new Key(key1)); @@ -417,7 +434,7 @@ class SimpleInMemoryKeyValueStorageTest { assertEquals(4, entries.size()); - map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity())); // Test regular put value. e1 = map.get(new Key(key1)); @@ -601,7 +618,7 @@ class SimpleInMemoryKeyValueStorageTest { assertEquals(4, entries.size()); - Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity())); // Test regular put value. Entry e1 = map.get(new Key(key1)); @@ -676,7 +693,7 @@ class SimpleInMemoryKeyValueStorageTest { assertEquals(4, entries.size()); - Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity())); // Test regular put value. Entry e1 = map.get(new Key(key1)); @@ -719,7 +736,7 @@ class SimpleInMemoryKeyValueStorageTest { assertEquals(4, entries.size()); - map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity())); + map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity())); // Test regular put value. e1 = map.get(new Key(key1)); @@ -987,60 +1004,377 @@ class SimpleInMemoryKeyValueStorageTest { } @Test - public void iterate() { - TreeMap<String, String> expFooMap = new TreeMap<>(); - TreeMap<String, String> expKeyMap = new TreeMap<>(); - TreeMap<String, String> expZooMap = new TreeMap<>(); + public void rangeCursor() { + byte[] key1 = k(1); + byte[] val1 = kv(1, 1); - fill("foo", storage, expFooMap); - fill("key", storage, expKeyMap); - fill("zoo", storage, expZooMap); + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); - assertEquals(300, storage.revision()); - assertEquals(300, storage.updateCounter()); + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); - assertIterate("key", storage, expKeyMap); - assertIterate("zoo", storage, expZooMap); - assertIterate("foo", storage, expFooMap); - } - private void assertIterate(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) { - Iterator<Entry> it = storage.iterate((pref + "_").getBytes()); - Iterator<Map.Entry<String, String>> expIt = expMap.entrySet().iterator(); + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.putAll(List.of(key1, key2, key3), List.of(val1, val2, val3)); + + assertEquals(1, storage.revision()); + assertEquals(3, storage.updateCounter()); + + // Range for latest revision without max bound. + Cursor<Entry> cur = storage.range(key1, null); + + Iterator<Entry> it = cur.iterator(); + + assertTrue(it.hasNext()); + + Entry e1 = it.next(); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertArrayEquals(key1, e1.key()); + assertArrayEquals(val1, e1.value()); + assertEquals(1, e1.revision()); + assertEquals(1, e1.updateCounter()); + + assertTrue(it.hasNext()); + + Entry e2 = it.next(); - // Order. - while (it.hasNext()) { - Entry entry = it.next(); - Map.Entry<String, String> expEntry = expIt.next(); + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertArrayEquals(key2, e2.key()); + assertArrayEquals(val2, e2.value()); + assertEquals(1, e2.revision()); + assertEquals(2, e2.updateCounter()); + + // Deliberately don't call it.hasNext() + + Entry e3 = it.next(); + + assertFalse(e3.empty()); + assertFalse(e3.tombstone()); + assertArrayEquals(key3, e3.key()); + assertArrayEquals(val3, e3.value()); + assertEquals(1, e3.revision()); + assertEquals(3, e3.updateCounter()); + + assertFalse(it.hasNext()); - assertEquals(expEntry.getKey(), new String(entry.key())); - assertEquals(expEntry.getValue(), new String(entry.value())); + try { + it.next(); + + fail(); + } + catch (NoSuchElementException e) { + System.out.println(); + // No-op. } - // Range boundaries. - it = storage.iterate((pref + '_').getBytes()); + // Range for latest revision with max bound. + cur = storage.range(key1, key3); + + it = cur.iterator(); + + assertTrue(it.hasNext()); + + e1 = it.next(); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertArrayEquals(key1, e1.key()); + assertArrayEquals(val1, e1.value()); + assertEquals(1, e1.revision()); + assertEquals(1, e1.updateCounter()); - while (it.hasNext()) { - Entry entry = it.next(); + assertTrue(it.hasNext()); - assertTrue(expMap.containsKey(new String(entry.key()))); + e2 = it.next(); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertArrayEquals(key2, e2.key()); + assertArrayEquals(val2, e2.value()); + assertEquals(1, e2.revision()); + assertEquals(2, e2.updateCounter()); + + assertFalse(it.hasNext()); + + try { + it.next(); + + fail(); + } + catch (NoSuchElementException e) { + System.out.println(); + // No-op. } } - private static void fill(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) { - for (int i = 0; i < 100; i++) { - String keyStr = pref + '_' + i; + @Test + public void watchCursorForRange() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); - String valStr = "val_" + i; + byte[] key2 = k(2); + byte[] val2_1 = kv(2, 21); + byte[] val2_2 = kv(2, 22); + + byte[] key3 = k(3); + byte[] val3_1 = kv(3, 31); - expMap.put(keyStr, valStr); + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); - byte[] key = keyStr.getBytes(); + // Watch for all updates starting from revision 2. + Cursor<WatchEvent> cur = storage.watch(key1, null, 2); - byte[] val = valStr.getBytes(); + Iterator<WatchEvent> it = cur.iterator(); - storage.getAndPut(key, val); - } + assertFalse(it.hasNext()); + assertNull(it.next()); + + storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1)); + + assertEquals(1, storage.revision()); + assertEquals(2, storage.updateCounter()); + + // Revision is less than 2. + assertFalse(it.hasNext()); + assertNull(it.next()); + + storage.putAll(List.of(key2, key3), List.of(val2_2, val3_1)); + + assertEquals(2, storage.revision()); + assertEquals(4, storage.updateCounter()); + + // Revision is 2. + assertTrue(it.hasNext()); + + WatchEvent watchEvent = it.next(); + + assertTrue(watchEvent.batch()); + + Map<Key, EntryEvent> map = watchEvent.entryEvents().stream() + .collect(Collectors.toMap(evt -> new Key(evt.entry().key()), identity())); + + assertEquals(2, map.size()); + + // First update under revision. + EntryEvent e2 = map.get(new Key(key2)); + + assertNotNull(e2); + + Entry oldEntry2 = e2.oldEntry(); + + assertFalse(oldEntry2.empty()); + assertFalse(oldEntry2.tombstone()); + assertEquals(1, oldEntry2.revision()); + assertEquals(2, oldEntry2.updateCounter()); + assertArrayEquals(key2, oldEntry2.key()); + assertArrayEquals(val2_1, oldEntry2.value()); + + Entry newEntry2 = e2.entry(); + + assertFalse(newEntry2.empty()); + assertFalse(newEntry2.tombstone()); + assertEquals(2, newEntry2.revision()); + assertEquals(3, newEntry2.updateCounter()); + assertArrayEquals(key2, newEntry2.key()); + assertArrayEquals(val2_2, newEntry2.value()); + + // Second update under revision. + EntryEvent e3 = map.get(new Key(key3)); + + assertNotNull(e3); + + Entry oldEntry3 = e3.oldEntry(); + + assertTrue(oldEntry3.empty()); + assertFalse(oldEntry3.tombstone()); + assertArrayEquals(key3, oldEntry3.key()); + + Entry newEntry3 = e3.entry(); + + assertFalse(newEntry3.empty()); + assertFalse(newEntry3.tombstone()); + assertEquals(2, newEntry3.revision()); + assertEquals(4, newEntry3.updateCounter()); + assertArrayEquals(key3, newEntry3.key()); + assertArrayEquals(val3_1, newEntry3.value()); + + assertFalse(it.hasNext()); + + storage.remove(key1); + + assertTrue(it.hasNext()); + + watchEvent = it.next(); + + assertFalse(watchEvent.batch()); + + EntryEvent e1 = watchEvent.entryEvent(); + + Entry oldEntry1 = e1.oldEntry(); + + assertFalse(oldEntry1.empty()); + assertFalse(oldEntry1.tombstone()); + assertEquals(1, oldEntry1.revision()); + assertEquals(1, oldEntry1.updateCounter()); + assertArrayEquals(key1, oldEntry1.key()); + assertArrayEquals(val1_1, oldEntry1.value()); + + Entry newEntry1 = e1.entry(); + + assertFalse(newEntry1.empty()); + assertTrue(newEntry1.tombstone()); + assertEquals(3, newEntry1.revision()); + assertEquals(5, newEntry1.updateCounter()); + assertArrayEquals(key1, newEntry1.key()); + assertNull(newEntry1.value()); + + assertFalse(it.hasNext()); + } + + + @Test + public void watchCursorForKey() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2_1 = kv(2, 21); + byte[] val2_2 = kv(2, 22); + + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + Cursor<WatchEvent> cur = storage.watch(key1, 1); + + Iterator<WatchEvent> it = cur.iterator(); + + assertFalse(it.hasNext()); + assertNull(it.next()); + + storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1)); + + assertEquals(1, storage.revision()); + assertEquals(2, storage.updateCounter()); + + assertTrue(it.hasNext()); + + WatchEvent watchEvent = it.next(); + + assertFalse(watchEvent.batch()); + + EntryEvent e1 = watchEvent.entryEvent(); + + Entry oldEntry1 = e1.oldEntry(); + + assertTrue(oldEntry1.empty()); + assertFalse(oldEntry1.tombstone()); + + Entry newEntry1 = e1.entry(); + + assertFalse(newEntry1.empty()); + assertFalse(newEntry1.tombstone()); + assertEquals(1, newEntry1.revision()); + assertEquals(1, newEntry1.updateCounter()); + assertArrayEquals(key1, newEntry1.key()); + assertArrayEquals(val1_1, newEntry1.value()); + + assertFalse(it.hasNext()); + + storage.put(key2, val2_2); + + assertFalse(it.hasNext()); + + storage.put(key1, val1_2); + + assertTrue(it.hasNext()); + + watchEvent = it.next(); + + assertFalse(watchEvent.batch()); + + e1 = watchEvent.entryEvent(); + + oldEntry1 = e1.oldEntry(); + + assertFalse(oldEntry1.empty()); + assertFalse(oldEntry1.tombstone()); + assertEquals(1, oldEntry1.revision()); + assertEquals(1, oldEntry1.updateCounter()); + assertArrayEquals(key1, newEntry1.key()); + assertArrayEquals(val1_1, newEntry1.value()); + + newEntry1 = e1.entry(); + + assertFalse(newEntry1.empty()); + assertFalse(newEntry1.tombstone()); + assertEquals(3, newEntry1.revision()); + assertEquals(4, newEntry1.updateCounter()); + assertArrayEquals(key1, newEntry1.key()); + assertArrayEquals(val1_2, newEntry1.value()); + + assertFalse(it.hasNext()); + } + + @Test + public void watchCursorForKeys() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2_1 = kv(2, 21); + byte[] val2_2 = kv(2, 22); + + byte[] key3 = k(3); + byte[] val3_1 = kv(3, 31); + byte[] val3_2 = kv(3, 32); + + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + Cursor<WatchEvent> cur = storage.watch(List.of(key1, key2), 1); + + Iterator<WatchEvent> it = cur.iterator(); + + assertFalse(it.hasNext()); + assertNull(it.next()); + + storage.putAll(List.of(key1, key2, key3), List.of(val1_1, val2_1, val3_1)); + + assertEquals(1, storage.revision()); + assertEquals(3, storage.updateCounter()); + + assertTrue(it.hasNext()); + + WatchEvent watchEvent = it.next(); + + assertTrue(watchEvent.batch()); + + assertFalse(it.hasNext()); + + storage.put(key2, val2_2); + + assertTrue(it.hasNext()); + + watchEvent = it.next(); + + assertFalse(watchEvent.batch()); + + assertFalse(it.hasNext()); + + storage.put(key3, val3_2); + + assertFalse(it.hasNext()); } private static void fill(KeyValueStorage storage, int keySuffix, int num) { @@ -1055,18 +1389,4 @@ class SimpleInMemoryKeyValueStorageTest { private static byte[] kv(int k, int v) { return ("key" + k + '_' + "val" + v).getBytes(); } - - private static class NoOpWatcher implements Watcher { - @Override public void register(@NotNull Watch watch) { - // No-op. - } - - @Override public void notify(@NotNull Entry e) { - // No-op. - } - - @Override public void cancel(@NotNull Watch watch) { - // No-op. - } - } } \ No newline at end of file
