This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f1ac1dba18 IGNITE-23288 Throw CompactedException for metastorage 
cursors and publishers (#4536)
f1ac1dba18 is described below

commit f1ac1dba183c03e76cd3fd838dbd8e19cd5c1d54
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Oct 11 21:02:18 2024 +0300

    IGNITE-23288 Throw CompactedException for metastorage cursors and 
publishers (#4536)
---
 .../internal/metastorage/MetaStorageManager.java   | 219 +++++++++------------
 .../metastorage/impl/MetaStorageManagerImpl.java   |  21 +-
 .../metastorage/server/KeyValueStorage.java        |  40 ++--
 .../metastorage/server/KeyValueStorageUtils.java   |   5 +-
 .../server/persistence/RocksDbKeyValueStorage.java | 219 ++++++++-------------
 .../AbstractCompactionKeyValueStorageTest.java     |  34 ++++
 .../server/SimpleInMemoryKeyValueStorage.java      |  96 ++++-----
 7 files changed, 280 insertions(+), 354 deletions(-)

diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 9cf32ec6b4..e4c6f7fd7e 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -54,72 +54,29 @@ public interface MetaStorageManager extends IgniteComponent 
{
     long appliedRevision();
 
     /**
-     * Returns a future of getting the latest version of an entry by key from 
the metastore leader.
+     * Returns a future of getting the latest version of an entry by key from 
the metastorage leader.
      *
      * <p>Never completes with a {@link CompactedException}.</p>
      *
      * <p>Future may complete with {@link NodeStoppingException} if the node 
is in the process of stopping.</p>
      *
-     * @param key The key.
+     * @param key Key.
      */
     CompletableFuture<Entry> get(ByteArray key);
 
     /**
-     * Returns a future of getting an entry for the given key and the revision 
upper bound from the metastore leader.
+     * Returns a future of getting an entry for the given key and the revision 
upper bound from the metastorage leader.
      *
      * <p>Future may complete with exceptions:</p>
      * <ul>
      *     <li>{@link NodeStoppingException} - if the node is in the process 
of stopping.</li>
-     *     <li>{@link CompactedException} - If the requested entry was not 
found and the {@code revUpperBound} is less than or equal to the
-     *     last compacted one.</li>
-     * </ul>
-     *
-     * <p>Let's consider examples of the work of the method and compaction of 
the metastore. Let's assume that we have keys with revisions
-     * "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some" has never 
been in the metastore.</p>
-     * <ul>
-     *     <li>Compaction revision is {@code 1}.
-     *     <ul>
-     *         <li>get("foo", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("foo", 2) - will return a single value with revision 
2.</li>
-     *         <li>get("foo", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("bar", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 2) - will return a single value with revision 
2.</li>
-     *         <li>get("bar", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("some", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 2) - will return an empty value.</li>
-     *         <li>get("some", 3) - will return an empty value.</li>
-     *     </ul>
-     *     </li>
-     *     <li>Compaction revision is {@code 2}.
-     *     <ul>
-     *         <li>get("foo", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("foo", 2) - will return a single value with revision 
2.</li>
-     *         <li>get("foo", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("bar", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 2) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("some", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 2) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 3) - will return an empty value.</li>
-     *     </ul>
-     *     </li>
-     *     <li>Compaction revision is {@code 3}.
-     *     <ul>
-     *         <li>get("foo", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("foo", 2) - will return a single value with revision 
2.</li>
-     *         <li>get("foo", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("bar", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 2) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 3) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 2) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 3) - a {@link CompactedException} will be 
thrown.</li>
-     *     </ul>
-     *     </li>
+     *     <li>{@link CompactedException} - if the requested entry was not 
found and the {@code revUpperBound} is less than or equal to the
+     *     last compacted one. For examples see {@link #getLocally(ByteArray, 
long)}.</li>
      * </ul>
      *
-     * @param key The key.
-     * @param revUpperBound The upper bound of revision.
+     * @param key Key.
+     * @param revUpperBound Upper bound of revision (inclusive).
+     * @see #getLocally(ByteArray, long)
      */
     CompletableFuture<Entry> get(ByteArray key, long revUpperBound);
 
@@ -216,53 +173,52 @@ public interface MetaStorageManager extends 
IgniteComponent {
      * <p>This method doesn't wait for the storage's revision to become 
greater or equal to the revUpperBound parameter, so it is
      * up to user to wait for the appropriate time to call this method.
      *
-     * <p>Let's consider examples of the work of the method and compaction of 
the metastore. Let's assume that we have keys with revisions
-     * "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some" has never 
been in the metastore.</p>
+     * <p>Let's consider examples of the work of the method and compaction of 
the metastorage. Let's assume that we have keys with revisions
+     * "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some" has never 
been in the metastorage.</p>
      * <ul>
      *     <li>Compaction revision is {@code 1}.
      *     <ul>
-     *         <li>get("foo", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("foo", 2) - will return a single value with revision 
2.</li>
-     *         <li>get("foo", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("bar", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 2) - will return a single value with revision 
2.</li>
-     *         <li>get("bar", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("some", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 2) - will return an empty value.</li>
-     *         <li>get("some", 3) - will return an empty value.</li>
+     *         <li>getLocally("foo", 1) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("foo", 2) - will return a single value with 
revision 2.</li>
+     *         <li>getLocally("foo", 3) - will return a single value with 
revision 2.</li>
+     *         <li>getLocally("bar", 1) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("bar", 2) - will return a single value with 
revision 2.</li>
+     *         <li>getLocally("bar", 3) - will return a single value with 
revision 2.</li>
+     *         <li>getLocally("some", 1) - a {@link CompactedException} will 
be thrown.</li>
+     *         <li>getLocally("some", 2) - will return an empty value.</li>
+     *         <li>getLocally("some", 3) - will return an empty value.</li>
      *     </ul>
      *     </li>
      *     <li>Compaction revision is {@code 2}.
      *     <ul>
-     *         <li>get("foo", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("foo", 2) - will return a single value with revision 
2.</li>
-     *         <li>get("foo", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("bar", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 2) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("some", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 2) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 3) - will return an empty value.</li>
+     *         <li>getLocally("foo", 1) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("foo", 2) - will return a single value with 
revision 2.</li>
+     *         <li>getLocally("foo", 3) - will return a single value with 
revision 2.</li>
+     *         <li>getLocally("bar", 1) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("bar", 2) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("bar", 3) - will return a single value with 
revision 2.</li>
+     *         <li>getLocally("some", 1) - a {@link CompactedException} will 
be thrown.</li>
+     *         <li>getLocally("some", 2) - a {@link CompactedException} will 
be thrown.</li>
+     *         <li>getLocally("some", 3) - will return an empty value.</li>
      *     </ul>
      *     </li>
      *     <li>Compaction revision is {@code 3}.
      *     <ul>
-     *         <li>get("foo", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("foo", 2) - will return a single value with revision 
2.</li>
-     *         <li>get("foo", 3) - will return a single value with revision 
2.</li>
-     *         <li>get("bar", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 2) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("bar", 3) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 1) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 2) - a {@link CompactedException} will be 
thrown.</li>
-     *         <li>get("some", 3) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("foo", 1) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("foo", 2) - will return a single value with 
revision 2.</li>
+     *         <li>getLocally("foo", 3) - will return a single value with 
revision 2.</li>
+     *         <li>getLocally("bar", 1) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("bar", 2) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("bar", 3) - a {@link CompactedException} will be 
thrown.</li>
+     *         <li>getLocally("some", 1) - a {@link CompactedException} will 
be thrown.</li>
+     *         <li>getLocally("some", 2) - a {@link CompactedException} will 
be thrown.</li>
+     *         <li>getLocally("some", 3) - a {@link CompactedException} will 
be thrown.</li>
      *     </ul>
      *     </li>
      * </ul>
      *
-     * @param key The key.
-     * @param revUpperBound The upper bound of revision.
-     * @return Value corresponding to the given key.
+     * @param key Key.
+     * @param revUpperBound Upper bound of revision (inclusive).
      * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if the node is in the process of stopping.
      * @throws CompactedException If the requested entry was not found and the 
{@code revUpperBound} is less than or equal to the last
      *      compacted one.
@@ -270,29 +226,37 @@ public interface MetaStorageManager extends 
IgniteComponent {
     Entry getLocally(ByteArray key, long revUpperBound);
 
     /**
-     * Returns cursor by entries which correspond to the given keys range and 
bounded by revision number. The entries in the cursor
-     * are obtained from the local storage.
+     * Returns cursor by entries which correspond to the given keys range and 
bounded by revision number locally.
+     *
+     * <p>Cursor will iterate over a snapshot of keys and their revisions at 
the time the method was invoked.</p>
+     *
+     * <p>Cursor methods never throw {@link CompactedException}.</p>
      *
      * <p>This method doesn't wait for the storage's revision to become 
greater or equal to the revUpperBound parameter, so it is
-     * up to user to wait for the appropriate time to call this method.
+     * up to user to wait for the appropriate time to call this method.</p>
      *
      * @param startKey Start key of range (inclusive).
-     * @param endKey Last key of range (exclusive).
-     * @param revUpperBound Upper bound of revision.
-     * @return Cursor by entries which correspond to the given keys range.
+     * @param endKey Last key of range (exclusive), {@code null} represents an 
unbound range.
+     * @param revUpperBound Upper bound of revision (inclusive) for each key.
+     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if the node is in the process of stopping.
+     * @throws CompactedException If the {@code revUpperBound} is less than or 
equal to the last compacted one.
      */
-    Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long 
revUpperBound);
+    Cursor<Entry> getLocally(ByteArray startKey, @Nullable ByteArray endKey, 
long revUpperBound);
 
     /**
-     * Returns cursor by entries which correspond to the given key prefix and 
bounded by revision number. The entries in the cursor
-     * are obtained from the local storage.
+     * Returns cursor by entries which correspond to the given key prefix and 
bounded by revision number locally.
+     *
+     * <p>Cursor will iterate over a snapshot of keys and their revisions at 
the time the method was invoked.</p>
+     *
+     * <p>Cursor methods never throw {@link CompactedException}.</p>
      *
      * <p>This method doesn't wait for the storage's revision to become 
greater or equal to the revUpperBound parameter, so it is
-     * up to user to wait for the appropriate time to call this method.
+     * up to user to wait for the appropriate time to call this method.</p>
      *
      * @param keyPrefix Key prefix.
-     * @param revUpperBound Upper bound of revision.
-     * @return Cursor by entries which correspond to the given key prefix.
+     * @param revUpperBound Upper bound of revision (inclusive) for each key.
+     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if the node is in the process of stopping.
+     * @throws CompactedException If the {@code revUpperBound} is less than or 
equal to the last compacted one.
      */
     Cursor<Entry> prefixLocally(ByteArray keyPrefix, long revUpperBound);
 
@@ -310,7 +274,7 @@ public interface MetaStorageManager extends IgniteComponent 
{
     HybridTimestamp timestampByRevisionLocally(long revision);
 
     /**
-     * Returns a future of getting the latest version of entries corresponding 
to the given keys from the metastore leader.
+     * Returns a future of getting the latest version of entries corresponding 
to the given keys from the metastorage leader.
      *
      * <p>Never completes with a {@link CompactedException}.</p>
      *
@@ -341,48 +305,49 @@ public interface MetaStorageManager extends 
IgniteComponent {
     CompletableFuture<Void> removeAll(Set<ByteArray> keys);
 
     /**
-     * Retrieves entries for the given key prefix in lexicographic order. 
Shortcut for {@link #prefix(ByteArray, long)} where
-     * {@code revUpperBound = LATEST_REVISION}.
+     * Returns a publisher for getting the latest version of an entries for 
the given key prefix from the metastorage leader.
      *
-     * @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be 
{@code null}.
-     * @return Publisher that will provide entries corresponding to the given 
prefix. This Publisher may also fail (by calling
-     *     {@link Subscriber#onError}) with one of the following exceptions:
-     *     <ul>
-     *         <li>{@link OperationTimeoutException} - if the operation is 
timed out;</li>
-     *         <li>{@link CompactedException} - if the desired revisions are 
removed from the storage due to a compaction;</li>
-     *         <li>{@link NodeStoppingException} - if this node has been 
stopped.</li>
-     *     </ul>
+     * <p>Never fail with a {@link CompactedException}.</p>
+     *
+     * <p>Publisher may fail (by calling {@link Subscriber#onError}) with one 
of the following exceptions:</p>
+     * <ul>
+     *     <li>{@link NodeStoppingException} - if the node is in the process 
of stopping.</li>
+     *     <li>{@link OperationTimeoutException} - if the operation is timed 
out.</li>
+     * </ul>
+     *
+     * @param keyPrefix Key prefix.
      */
     Publisher<Entry> prefix(ByteArray keyPrefix);
 
     /**
-     * Retrieves entries for the given key prefix in lexicographic order. 
Entries will be filtered out by upper bound of given revision
-     * number.
+     * Returns a publisher for getting an entries for the given key prefix and 
the revision upper bound from the metastorage leader.
      *
-     * @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be 
{@code null}.
-     * @param revUpperBound The upper bound for entry revision or {@link 
MetaStorageManager#LATEST_REVISION} for no revision bound.
-     * @return Publisher that will provide entries corresponding to the given 
prefix and revision. This Publisher may also fail (by calling
-     *     {@link Subscriber#onError}) with one of the following exceptions:
-     *     <ul>
-     *         <li>{@link OperationTimeoutException} - if the operation is 
timed out;</li>
-     *         <li>{@link CompactedException} - if the desired revisions are 
removed from the storage due to a compaction;</li>
-     *         <li>{@link NodeStoppingException} - if this node has been 
stopped.</li>
-     *     </ul>
+     * <p>Publisher may fail (by calling {@link Subscriber#onError}) with one 
of the following exceptions:</p>
+     * <ul>
+     *     <li>{@link NodeStoppingException} - if the node is in the process 
of stopping.</li>
+     *     <li>{@link OperationTimeoutException} - if the operation is timed 
out.</li>
+     *     <li>{@link CompactedException} - if the {@code revUpperBound} is 
less than or equal to the last compacted one on metastorage
+     *     leader, can occur while processing any batch of entries.</li>
+     * </ul>
+     *
+     * @param keyPrefix Key prefix.
+     * @param revUpperBound Upper bound of revision (inclusive) for each key.
      */
     Publisher<Entry> prefix(ByteArray keyPrefix, long revUpperBound);
 
     /**
-     * Retrieves entries for the given key range in lexicographic order.
+     * Returns a publisher for getting the latest version of an entries for 
the given keys range from the metastorage leader.
      *
-     * @param keyFrom Range lower bound (inclusive).
-     * @param keyTo Range upper bound (exclusive), {@code null} represents an 
unbound range.
-     * @return Publisher that will provide entries corresponding to the given 
range. This Publisher may also fail (by calling
-     *     {@link Subscriber#onError}) with one of the following exceptions:
-     *     <ul>
-     *         <li>{@link OperationTimeoutException} - if the operation is 
timed out;</li>
-     *         <li>{@link CompactedException} - if the desired revisions are 
removed from the storage due to a compaction;</li>
-     *         <li>{@link NodeStoppingException} - if this node has been 
stopped.</li>
-     *     </ul>
+     * <p>Never fail with a {@link CompactedException}.</p>
+     *
+     * <p>Publisher may fail (by calling {@link Subscriber#onError}) with one 
of the following exceptions:</p>
+     * <ul>
+     *     <li>{@link NodeStoppingException} - if the node is in the process 
of stopping.</li>
+     *     <li>{@link OperationTimeoutException} - if the operation is timed 
out.</li>
+     * </ul>
+     *
+     * @param keyFrom Start key of range (inclusive).
+     * @param keyTo Last key of range (exclusive), {@code null} represents an 
unbound range.
      */
     Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray keyTo);
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index fee2698fcf..8b18fd6d9d 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -764,15 +764,17 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
     @Override
     public Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long 
revUpperBound) {
-        return storage.range(startKey.bytes(), endKey.bytes(), revUpperBound);
+        return inBusyLock(busyLock, () -> storage.range(startKey.bytes(), 
endKey == null ? null : endKey.bytes(), revUpperBound));
     }
 
     @Override
     public Cursor<Entry> prefixLocally(ByteArray keyPrefix, long 
revUpperBound) {
-        byte[] rangeStart = keyPrefix.bytes();
-        byte[] rangeEnd = storage.nextKey(rangeStart);
+        return inBusyLock(busyLock, () -> {
+            byte[] rangeStart = keyPrefix.bytes();
+            byte[] rangeEnd = storage.nextKey(rangeStart);
 
-        return storage.range(rangeStart, rangeEnd, revUpperBound);
+            return storage.range(rangeStart, rangeEnd, revUpperBound);
+        });
     }
 
     @Override
@@ -916,21 +918,12 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
     @Override
     public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray 
keyTo) {
-        return range(keyFrom, keyTo, false);
-    }
-
-    /**
-     * Retrieves entries for the given key range in lexicographic order.
-     *
-     * @see MetaStorageService#range(ByteArray, ByteArray, boolean)
-     */
-    public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray 
keyTo, boolean includeTombstones) {
         if (!busyLock.enterBusy()) {
             return new NodeStoppingPublisher<>();
         }
 
         try {
-            return new 
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc -> 
svc.range(keyFrom, keyTo, includeTombstones)));
+            return new 
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc -> 
svc.range(keyFrom, keyTo, false)));
         } finally {
             busyLock.leaveBusy();
         }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 0120df0c88..ccbaa1d580 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -67,8 +67,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
     /**
      * Returns an entry by the given key and bounded by the given revision.
      *
-     * <p>Let's consider examples of the work of the method and compaction of 
the metastore. Let's assume that we have keys with revisions
-     * "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some" has never 
been in the metastore.</p>
+     * <p>Let's consider examples of the work of the method and compaction of 
the metastorage. Let's assume that we have keys with
+     * revisions "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some" 
has never been in the metastorage.</p>
      * <ul>
      *     <li>Compaction revision is {@code 1}.
      *     <ul>
@@ -112,7 +112,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * </ul>
      *
      * @param key Key.
-     * @param revUpperBound The upper bound of revision.
+     * @param revUpperBound Upper bound of revision (inclusive).
      * @throws CompactedException If the requested entry was not found and the 
{@code revUpperBound} is less than or equal to the last
      *      {@link #setCompactionRevision compacted} one.
      */
@@ -213,7 +213,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * Returns entries corresponding to the given keys and bounded by the 
given revision.
      *
      * @param keys Not empty keys.
-     * @param revUpperBound Upper bound of revision.
+     * @param revUpperBound Upper bound of revision (inclusive).
      * @throws CompactedException If getting any of the individual entries 
would have thrown this exception as if
      *      {@link #get(byte[], long)} was used.
      * @see #get(byte[], long)
@@ -285,21 +285,29 @@ public interface KeyValueStorage extends 
ManuallyCloseable {
     StatementResult invoke(If iif, HybridTimestamp opTs, CommandId commandId);
 
     /**
-     * Returns cursor by entries which correspond to the given keys range.
+     * Returns cursor by latest entries which correspond to the given keys 
range.
+     *
+     * <p>Cursor will iterate over a snapshot of keys and their revisions at 
the time the method was invoked.</p>
+     *
+     * <p>Never throws {@link CompactedException} as well as cursor 
methods.</p>
      *
      * @param keyFrom Start key of range (inclusive).
-     * @param keyTo Last key of range (exclusive).
-     * @return Cursor by entries which correspond to the given keys range.
+     * @param keyTo Last key of range (exclusive), {@code null} represents an 
unbound range.
      */
     Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo);
 
     /**
      * Returns cursor by entries which correspond to the given keys range and 
bounded by revision number.
      *
+     * <p>Cursor will iterate over a snapshot of keys and their revisions at 
the time the method was invoked.</p>
+     *
+     * <p>Cursor methods never throw {@link CompactedException}.</p>
+     *
      * @param keyFrom Start key of range (inclusive).
-     * @param keyTo Last key of range (exclusive).
-     * @param revUpperBound Upper bound of revision.
-     * @return Cursor by entries which correspond to the given keys range.
+     * @param keyTo Last key of range (exclusive), {@code null} represents an 
unbound range.
+     * @param revUpperBound Upper bound of revision (inclusive) for each key.
+     * @throws CompactedException If the {@code revUpperBound} is less than or 
equal to the last {@link #setCompactionRevision compacted}
+     *      one.
      */
     Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long 
revUpperBound);
 
@@ -307,7 +315,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * Creates subscription on updates of entries corresponding to the given 
keys range and starting from the given revision number.
      *
      * @param keyFrom Start key of range (inclusive).
-     * @param keyTo Last key of range (exclusive).
+     * @param keyTo Last key of range (exclusive), {@code null} represents an 
unbound range.
      * @param rev Start revision number.
      */
     void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, 
WatchListener listener);
@@ -315,8 +323,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
     /**
      * Registers a watch listener for the provided key.
      *
-     * @param key Meta Storage key.
-     * @param rev Starting Meta Storage revision.
+     * @param key Key.
+     * @param rev Start revision number.
      * @param listener Listener which will be notified for each update.
      */
     void watchExact(byte[] key, long rev, WatchListener listener);
@@ -324,8 +332,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
     /**
      * Registers a watch listener for the provided keys.
      *
-     * @param keys Meta Storage keys.
-     * @param rev Starting Meta Storage revision.
+     * @param keys Not empty keys.
+     * @param rev Start revision number.
      * @param listener Listener which will be notified for each update.
      */
     void watchExact(Collection<byte[]> keys, long rev, WatchListener listener);
@@ -333,7 +341,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
     /**
      * Starts all registered watches.
      *
-     * <p>Before calling this method, watches will not receive any updates.
+     * <p>Before calling this method, watches will not receive any updates.</p>
      *
      * @param startRevision Revision to start processing updates from.
      * @param revisionCallback Callback that will be invoked after all watches 
of a particular revision are processed, with the
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
index 705907e08a..f7aaab1171 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Arrays.binarySearch;
 
 import java.util.function.LongPredicate;
+import org.jetbrains.annotations.Nullable;
 
 /** Helper class with useful methods and constants for {@link KeyValueStorage} 
implementations. */
 public class KeyValueStorageUtils {
@@ -110,8 +111,8 @@ public class KeyValueStorageUtils {
      *
      * @param bytes Bytes.
      */
-    public static String toUtf8String(byte[] bytes) {
-        return new String(bytes, UTF_8);
+    public static String toUtf8String(byte @Nullable [] bytes) {
+        return bytes == null ? "null" : new String(bytes, UTF_8);
     }
 
     /** Asserts that the compaction revision is less than the current storage 
revision. */
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d288d7b6bd..5e792e495f 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -866,7 +866,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         rwLock.readLock().lock();
 
         try {
-            return range(keyFrom, keyTo, rev);
+            return doRange(keyFrom, keyTo, rev);
         } finally {
             rwLock.readLock().unlock();
         }
@@ -877,86 +877,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         rwLock.readLock().lock();
 
         try {
-            var readOpts = new ReadOptions();
-
-            var upperBound = keyTo == null ? null : new Slice(keyTo);
-
-            readOpts.setIterateUpperBound(upperBound);
-
-            RocksIterator iterator = index.newIterator(readOpts);
-
-            iterator.seek(keyFrom);
-
-            return new RocksIteratorAdapter<>(iterator) {
-                /** Cached entry used to filter "empty" values. */
-                @Nullable
-                private Entry next;
-
-                @Override
-                public boolean hasNext() {
-                    if (next != null) {
-                        return true;
-                    }
-
-                    while (next == null && super.hasNext()) {
-                        Entry nextCandidate = decodeEntry(it.key(), 
it.value());
-
-                        it.next();
-
-                        if (!nextCandidate.empty()) {
-                            next = nextCandidate;
-
-                            return true;
-                        }
-                    }
-
-                    return false;
-                }
-
-                @Override
-                public Entry next() {
-                    if (!hasNext()) {
-                        throw new NoSuchElementException();
-                    }
-
-                    Entry result = next;
-
-                    assert result != null;
-
-                    next = null;
-
-                    return result;
-                }
-
-                @Override
-                protected Entry decodeEntry(byte[] key, byte[] value) {
-                    long[] revisions = getAsLongs(value);
-
-                    long targetRevision = maxRevision(revisions, 
revUpperBound);
-
-                    if (targetRevision == -1) {
-                        return EntryImpl.empty(key);
-                    }
-
-                    // This is not a correct approach for using locks in terms 
of compaction correctness (we should block compaction for the
-                    // whole iteration duration). However, compaction is not 
fully implemented yet, so this lock is taken for consistency
-                    // sake. This part must be rewritten in the future.
-                    rwLock.readLock().lock();
-
-                    try {
-                        return doGetValue(key, targetRevision);
-                    } finally {
-                        rwLock.readLock().unlock();
-                    }
-                }
-
-                @Override
-                public void close() {
-                    super.close();
-
-                    RocksUtils.closeAll(readOpts, upperBound);
-                }
-            };
+            return doRange(keyFrom, keyTo, revUpperBound);
         } finally {
             rwLock.readLock().unlock();
         }
@@ -1236,59 +1157,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
-    /**
-     * Returns maximum revision which must be less or equal to {@code 
upperBoundRev}. If there is no such revision then {@code -1} will be
-     * returned.
-     *
-     * @param revs          Revisions list.
-     * @param upperBoundRev Revision upper bound.
-     * @return Maximum revision or {@code -1} if there is no such revision.
-     */
-    private static long maxRevision(long[] revs, long upperBoundRev) {
-        for (int i = revs.length - 1; i >= 0; i--) {
-            long rev = revs[i];
-
-            if (rev <= upperBoundRev) {
-                return rev;
-            }
-        }
-
-        return -1;
-    }
-
-    /**
-     * Gets the value by a key and a revision.
-     *
-     * @param key      Target key.
-     * @param revision Target revision.
-     * @return Entry.
-     */
-    private Entry doGetValue(byte[] key, long revision) {
-        if (revision == 0) {
-            return EntryImpl.empty(key);
-        }
-
-        byte[] valueBytes;
-
-        try {
-            valueBytes = data.get(keyToRocksKey(revision, key));
-        } catch (RocksDBException e) {
-            throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        }
-
-        if (valueBytes == null || valueBytes.length == 0) {
-            return EntryImpl.empty(key);
-        }
-
-        Value lastVal = bytesToValue(valueBytes);
-
-        if (lastVal.tombstone()) {
-            return EntryImpl.tombstone(key, revision, 
lastVal.operationTimestamp());
-        }
-
-        return new EntryImpl(key, lastVal.bytes(), revision, 
lastVal.operationTimestamp());
-    }
-
     /**
      * Adds an entry to the batch.
      *
@@ -1767,4 +1635,87 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
             );
         }
     }
+
+    private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo, 
long revUpperBound) {
+        assert revUpperBound >= 0 : revUpperBound;
+
+        
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
 compactionRevision);
+
+        var readOpts = new ReadOptions();
+
+        Slice upperBound = keyTo == null ? null : new Slice(keyTo);
+
+        readOpts.setIterateUpperBound(upperBound);
+
+        RocksIterator iterator = index.newIterator(readOpts);
+
+        iterator.seek(keyFrom);
+
+        return new RocksIteratorAdapter<>(iterator) {
+            /** Cached entry used to filter "empty" values. */
+            private @Nullable Entry next;
+
+            @Override
+            public boolean hasNext() {
+                if (next != null) {
+                    return true;
+                }
+
+                while (next == null && super.hasNext()) {
+                    Entry nextCandidate = decodeEntry(it.key(), it.value());
+
+                    it.next();
+
+                    if (!nextCandidate.empty()) {
+                        next = nextCandidate;
+
+                        return true;
+                    }
+                }
+
+                return false;
+            }
+
+            @Override
+            public Entry next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+
+                Entry result = next;
+
+                assert result != null;
+
+                next = null;
+
+                return result;
+            }
+
+            @Override
+            protected Entry decodeEntry(byte[] key, byte[] keyRevisionsBytes) {
+                long[] keyRevisions = getAsLongs(keyRevisionsBytes);
+
+                int maxRevisionIndex = maxRevisionIndex(keyRevisions, 
revUpperBound);
+
+                if (maxRevisionIndex == NOT_FOUND) {
+                    return EntryImpl.empty(key);
+                }
+
+                long revision = keyRevisions[maxRevisionIndex];
+
+                // According to the compaction algorithm, we will start it 
locally on a new compaction revision only when all cursors are
+                // completed strictly before it. Therefore, during normal 
operation, we should not get an error here.
+                Value value = getValueForOperation(key, revision);
+
+                return EntryImpl.toEntry(key, revision, value);
+            }
+
+            @Override
+            public void close() {
+                super.close();
+
+                RocksUtils.closeAll(readOpts, upperBound);
+            }
+        };
+    }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index c3e10a8316..66324d84ce 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
 import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.Cursor;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -688,6 +689,39 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertDoesNotThrowsCompactedExceptionForGetList(NOT_EXISTS_KEY, 7, 7);
     }
 
+    @Test
+    void testRangeLatestAndCompaction() {
+        storage.setCompactionRevision(6);
+
+        assertDoesNotThrow(() -> {
+            try (Cursor<Entry> cursor = storage.range(FOO_KEY, null)) {
+                cursor.hasNext();
+                cursor.next();
+            }
+        });
+    }
+
+    @Test
+    void testRangeAndCompaction() {
+        try (Cursor<Entry> cursorBeforeSetCompactionRevision = 
storage.range(FOO_KEY, null, 5)) {
+            storage.setCompactionRevision(5);
+
+            assertThrows(CompactedException.class, () -> 
storage.range(FOO_KEY, null, 1));
+            assertThrows(CompactedException.class, () -> 
storage.range(FOO_KEY, null, 3));
+            assertThrows(CompactedException.class, () -> 
storage.range(FOO_KEY, null, 5));
+
+            assertDoesNotThrow(() -> {
+                try (Cursor<Entry> range = storage.range(FOO_KEY, null, 6)) {
+                    range.hasNext();
+                    range.next();
+                }
+            });
+
+            assertDoesNotThrow(cursorBeforeSetCompactionRevision::hasNext);
+            assertDoesNotThrow(cursorBeforeSetCompactionRevision::next);
+        }
+    }
+
     private List<Integer> collectRevisions(byte[] key) {
         var revisions = new ArrayList<Integer>();
 
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 06bb3e10c0..7d379dedb6 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -386,29 +386,14 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     @Override
     public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
         synchronized (mux) {
-            return range(keyFrom, keyTo, rev);
+            return doRange(keyFrom, keyTo, rev);
         }
     }
 
     @Override
     public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long 
revUpperBound) {
         synchronized (mux) {
-            SortedMap<byte[], List<Long>> subMap = keyTo == null
-                    ? keysIdx.tailMap(keyFrom)
-                    : keysIdx.subMap(keyFrom, keyTo);
-
-            return subMap.entrySet().stream()
-                    .map(e -> {
-                        long targetRevision = maxRevision(e.getValue(), 
revUpperBound);
-
-                        if (targetRevision == -1) {
-                            return EntryImpl.empty(e.getKey());
-                        }
-
-                        return doGetValue(e.getKey(), targetRevision);
-                    })
-                    .filter(e -> !e.empty())
-                    .collect(collectingAndThen(toList(), 
Cursor::fromIterable));
+            return doRange(keyFrom, keyTo, revUpperBound);
         }
     }
 
@@ -529,8 +514,10 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
             return;
         }
 
-        HybridTimestamp ts = revToTsMap.get(updatedEntries.get(0).revision());
-        assert ts != null;
+        long revision = updatedEntries.get(0).revision();
+
+        HybridTimestamp ts = revToTsMap.get(revision);
+        assert ts != null : revision;
 
         watchProcessor.notifyWatches(List.copyOf(updatedEntries), ts);
 
@@ -785,48 +772,6 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         return entries;
     }
 
-    /**
-     * Returns maximum revision which must be less or equal to {@code 
upperBoundRev}. If there is no such revision then {@code -1} will be
-     * returned.
-     *
-     * @param revs Revisions list.
-     * @param upperBoundRev Revision upper bound.
-     * @return Appropriate revision or {@code -1} if there is no such revision.
-     */
-    private static long maxRevision(List<Long> revs, long upperBoundRev) {
-        int i = revs.size() - 1;
-
-        for (; i >= 0; i--) {
-            long rev = revs.get(i);
-
-            if (rev <= upperBoundRev) {
-                return rev;
-            }
-        }
-
-        return -1;
-    }
-
-    private Entry doGetValue(byte[] key, long lastRev) {
-        if (lastRev == 0) {
-            return EntryImpl.empty(key);
-        }
-
-        NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev);
-
-        if (lastRevVals == null || lastRevVals.isEmpty()) {
-            return EntryImpl.empty(key);
-        }
-
-        Value lastVal = lastRevVals.get(key);
-
-        if (lastVal.tombstone()) {
-            return EntryImpl.tombstone(key, lastRev, 
lastVal.operationTimestamp());
-        }
-
-        return new EntryImpl(key, lastVal.bytes(), lastRev, 
lastVal.operationTimestamp());
-    }
-
     private void doPut(byte[] key, byte[] bytes, long curRev, HybridTimestamp 
opTs) {
         // Update keysIdx.
         List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
@@ -983,4 +928,33 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
 
         return value;
     }
+
+    private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo, 
long revUpperBound) {
+        assert revUpperBound >= 0 : revUpperBound;
+
+        
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
 compactionRevision);
+
+        SortedMap<byte[], List<Long>> subMap = keyTo == null
+                ? keysIdx.tailMap(keyFrom)
+                : keysIdx.subMap(keyFrom, keyTo);
+
+        return subMap.entrySet().stream()
+                .map(e -> {
+                    byte[] key = e.getKey();
+                    long[] keyRevisions = toLongArray(e.getValue());
+
+                    int maxRevisionIndex = 
KeyValueStorageUtils.maxRevisionIndex(keyRevisions, revUpperBound);
+
+                    if (maxRevisionIndex == NOT_FOUND) {
+                        return EntryImpl.empty(key);
+                    }
+
+                    long revision = keyRevisions[maxRevisionIndex];
+                    Value value = getValue(key, revision);
+
+                    return EntryImpl.toEntry(key, revision, value);
+                })
+                .filter(e -> !e.empty())
+                .collect(collectingAndThen(toList(), Cursor::fromIterable));
+    }
 }


Reply via email to