This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 79564ca7e4 IGNITE-23291 Improve the local compaction mechanism of 
Metastorage (#4465)
79564ca7e4 is described below

commit 79564ca7e4cbc40fc35f9004e1595efaa425c781
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Sat Sep 28 12:54:23 2024 +0300

    IGNITE-23291 Improve the local compaction mechanism of Metastorage (#4465)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../internal/metastorage/MetaStorageManager.java   |  29 +++
 .../metastorage/exceptions/CompactedException.java |  25 ++-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  22 +-
 .../metastorage/impl/MetaStorageService.java       |   8 -
 .../metastorage/impl/MetaStorageServiceImpl.java   |   6 -
 .../metastorage/server/CompactedException.java     |  59 ------
 .../metastorage/server/KeyValueStorage.java        |  33 ++-
 .../metastorage/server/KeyValueStorageUtils.java   |  67 ++++++
 .../server/persistence/RocksDbKeyValueStorage.java | 170 ++++++---------
 .../AbstractCompactionKeyValueStorageTest.java     | 227 +++++++++------------
 .../server/KeyValueStorageUtilsTest.java           |  78 +++++++
 .../server/AbstractKeyValueStorageTest.java        |   2 +-
 .../server/SimpleInMemoryKeyValueStorage.java      | 157 +++++++-------
 modules/platforms/cpp/ignite/common/error_codes.h  |   1 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   2 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 17 files changed, 472 insertions(+), 420 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 4a173d618b..2f15dd0383 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -302,6 +302,9 @@ public class ErrorGroups {
 
         /** Failed to perform an operation within a specified time period. 
Usually in such cases the operation should be retried. */
         public static final int OP_EXECUTION_TIMEOUT_ERR = 
META_STORAGE_ERR_GROUP.registerErrorCode((short) 5);
+
+        /** Failed to perform a read operation on the underlying key value 
storage because the revision has already been compacted. */
+        public static final int COMPACTED_ERR = 
META_STORAGE_ERR_GROUP.registerErrorCode((short) 6);
     }
 
     /** Index error group. */
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 09970056ca..65e0eb9586 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
@@ -33,6 +34,7 @@ import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import 
org.apache.ignite.internal.metastorage.exceptions.OperationTimeoutException;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.util.Cursor;
@@ -272,4 +274,31 @@ public interface MetaStorageManager extends 
IgniteComponent {
 
     /** Unregisters a Meta Storage revision update listener. */
     void unregisterRevisionUpdateListener(RevisionUpdateListener listener);
+
+    /**
+     * Compacts outdated key versions and removes tombstones of metastorage 
locally.
+     *
+     * <p>We do not compact the only and last version of the key unless it is 
a tombstone.</p>
+     *
+     * <p>Let's look at some examples, let's say we have the following keys 
with their versions:</p>
+     * <ul>
+     *     <li>Key "foo" with versions that have revisions (1, 3, 5) - "foo" 
[1, 3, 5].</li>
+     *     <li>Key "bar" with versions that have revisions (1, 2, 5) the last 
revision is a tombstone - "bar" [1, 2, 5 tomb].</li>
+     * </ul>
+     *
+     * <p>Let's look at examples of invoking the current method and what will 
be in the storage after:</p>
+     * <ul>
+     *     <li>Compaction revision is {@code 1}: "foo" [3, 5], "bar" [2, 5 
tomb].</li>
+     *     <li>Compaction revision is {@code 2}: "foo" [3, 5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 3}: "foo" [5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 4}: "foo" [5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 5}: "foo" [5].</li>
+     *     <li>Compaction revision is {@code 6}: "foo" [5].</li>
+     * </ul>
+     *
+     * @param revision Revision up to which (including) the metastorage keys 
will be compacted.
+     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if the node is in the process of stopping.
+     * @throws MetaStorageException If there is an error during the 
metastorage compaction process.
+     */
+    void compactLocally(long revision);
 }
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/exceptions/CompactedException.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/exceptions/CompactedException.java
index 1b0d01e9c3..3f11f48f65 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/exceptions/CompactedException.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/exceptions/CompactedException.java
@@ -17,18 +17,29 @@
 
 package org.apache.ignite.internal.metastorage.exceptions;
 
-import static org.apache.ignite.lang.ErrorGroups.MetaStorage.COMPACTION_ERR;
+import static org.apache.ignite.lang.ErrorGroups.MetaStorage.COMPACTED_ERR;
 
 /**
- * Thrown when a requested operation on meta storage could not be performed 
because target revisions were removed from storage due to a
- * compaction procedure. In such case the operation should be retried with 
actual revision.
+ * Thrown when a requested read operation on meta storage could not be 
performed because target revisions were removed from storage due to
+ * a compaction procedure. In such case the operation should be retried with 
actual revision.
  */
 public class CompactedException extends MetaStorageException {
+    private static final long serialVersionUID = -6849399873850280288L;
+
     /**
      * Constructs an exception.
      */
     public CompactedException() {
-        super(COMPACTION_ERR);
+        super(COMPACTED_ERR);
+    }
+
+    /**
+     * Constructs an exception with a given message.
+     *
+     * @param revision Requested revision.
+     */
+    public CompactedException(long revision) {
+        super(COMPACTED_ERR, "Requested revision has already been compacted: " 
+ revision);
     }
 
     /**
@@ -37,7 +48,7 @@ public class CompactedException extends MetaStorageException {
      * @param message Detail message.
      */
     public CompactedException(String message) {
-        super(COMPACTION_ERR, message);
+        super(COMPACTED_ERR, message);
     }
 
     /**
@@ -47,7 +58,7 @@ public class CompactedException extends MetaStorageException {
      * @param cause   Cause.
      */
     public CompactedException(String message, Throwable cause) {
-        super(COMPACTION_ERR, message, cause);
+        super(COMPACTED_ERR, message, cause);
     }
 
     /**
@@ -56,6 +67,6 @@ public class CompactedException extends MetaStorageException {
      * @param cause Cause.
      */
     public CompactedException(Throwable cause) {
-        super(COMPACTION_ERR, cause);
+        super(COMPACTED_ERR, cause);
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 6ddca448ab..fa7f9f6b0a 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -958,23 +958,6 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         }
     }
 
-    /**
-     * Compacts Meta storage (removes all tombstone entries and old entries 
except of entries with latest revision).
-     *
-     * @see MetaStorageService#compact()
-     */
-    public CompletableFuture<Void> compact() {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new NodeStoppingException());
-        }
-
-        try {
-            return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
     private void onSafeTimeAdvanced(HybridTimestamp time) {
         assert time != null;
 
@@ -1147,4 +1130,9 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
             return S.toString(this);
         }
     }
+
+    @Override
+    public void compactLocally(long revision) {
+        inBusyLock(busyLock, () -> storage.compact(revision));
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
index 66bb7c57ab..563a025b7e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
@@ -248,14 +248,6 @@ public interface MetaStorageService extends 
ManuallyCloseable {
      */
     Publisher<Entry> prefix(ByteArray prefix, long revUpperBound);
 
-    /**
-     * Compacts meta storage (removes all tombstone entries and old entries 
except of entries with latest revision).
-     *
-     * @return Completed future. Couldn't be {@code null}.
-     * @throws OperationTimeoutException If the operation is timed out. Will 
be thrown on getting future result.
-     */
-    CompletableFuture<Void> compact();
-
     /**
      * Returns a future which will hold current revision of the metastorage 
leader.
      */
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 3dc67a4012..1ec59b36cf 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -257,12 +257,6 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
         return context.raftService().run(syncTimeCommand);
     }
 
-    // TODO: IGNITE-19417 Implement.
-    @Override
-    public CompletableFuture<Void> compact() {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public CompletableFuture<Long> currentRevision() {
         GetCurrentRevisionCommand cmd = 
context.commandsFactory().getCurrentRevisionCommand().build();
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/CompactedException.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/CompactedException.java
deleted file mode 100644
index 07f18a18f0..0000000000
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/CompactedException.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.server;
-
-/**
- * Thrown when a requested operation on meta storage could not be performed 
because target revisions were removed from storage due to a
- * compaction procedure. In such case the operation should be retried with 
actual revision.
- */
-public class CompactedException extends RuntimeException {
-    /**
-     * Constructs an exception.
-     */
-    public CompactedException() {
-        super();
-    }
-
-    /**
-     * Constructs an exception with a given message.
-     *
-     * @param message Detail message.
-     */
-    public CompactedException(String message) {
-        super(message);
-    }
-
-    /**
-     * Constructs an exception with a given message and a cause.
-     *
-     * @param message Detail message.
-     * @param cause   Cause.
-     */
-    public CompactedException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    /**
-     * Constructs an exception with a given cause.
-     *
-     * @param cause Cause.
-     */
-    public CompactedException(Throwable cause) {
-        super(cause);
-    }
-}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3159069f15..454b455a1c 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.metastorage.RevisionUpdateListener;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
@@ -233,13 +234,33 @@ public interface KeyValueStorage extends 
ManuallyCloseable {
     void removeWatch(WatchListener listener);
 
     /**
-     * Compacts storage (removes tombstones).
+     * Compacts outdated key versions and removes tombstones of metastorage 
locally.
      *
-     * @param lowWatermark A time threshold for the entry. Only entries that 
have revisions with timestamp higher or equal to the
-     *     watermark can be removed.
+     * <p>We do not compact the only and last version of the key unless it is 
a tombstone.</p>
+     *
+     * <p>Let's look at some examples, let's say we have the following keys 
with their versions:</p>
+     * <ul>
+     *     <li>Key "foo" with versions that have revisions (1, 3, 5) - "foo" 
[1, 3, 5].</li>
+     *     <li>Key "bar" with versions that have revisions (1, 2, 5) the last 
revision is a tombstone - "bar" [1, 2, 5 tomb].</li>
+     * </ul>
+     *
+     * <p>Let's look at examples of invoking the current method and what will 
be in the storage after:</p>
+     * <ul>
+     *     <li>Compaction revision is {@code 1}: "foo" [3, 5], "bar" [2, 5 
tomb].</li>
+     *     <li>Compaction revision is {@code 2}: "foo" [3, 5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 3}: "foo" [5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 4}: "foo" [5], "bar" [5 
tomb].</li>
+     *     <li>Compaction revision is {@code 5}: "foo" [5].</li>
+     *     <li>Compaction revision is {@code 6}: "foo" [5].</li>
+     * </ul>
+     *
+     * <p>Compaction revision is expected to be less than the {@link #revision 
current storage revision}.</p>
+     *
+     * @param revision Revision up to which (including) the metastorage keys 
will be compacted.
+     * @throws MetaStorageException If there is an error during the 
metastorage compaction process.
      */
-    // TODO: IGNITE-19417 Provide low-watermark for compaction.
-    void compact(HybridTimestamp lowWatermark);
+    // TODO: IGNITE-23281 Do not hold write lock for the entire operation
+    void compact(long revision);
 
     /**
      * Creates a snapshot of the storage's current state in the specified 
directory.
@@ -268,6 +289,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * @param revision Revision by which to do a lookup.
      * @return Timestamp corresponding to the revision.
      */
+    // TODO: IGNITE-23307 Figure out what to do after compaction
     HybridTimestamp timestampByRevision(long revision);
 
     /**
@@ -276,6 +298,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * @param timestamp Timestamp by which to do a lookup.
      * @return Revision lesser or equal to the timestamp or -1 if there is no 
such revision.
      */
+    // TODO: IGNITE-23307 Figure out what to do after compaction
     long revisionByTimestamp(HybridTimestamp timestamp);
 
     /**
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
new file mode 100644
index 0000000000..41c9a55cdd
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.binarySearch;
+
+import java.util.function.LongPredicate;
+
+/** Helper class with useful methods and constants for {@link KeyValueStorage} 
implementations. */
+public class KeyValueStorageUtils {
+    /** Special value indicating that there are no key revisions that need to 
be compacted. */
+    public static final int NOTHING_TO_COMPACT_INDEX = -1;
+
+    /**
+     * Calculates the revision index in key revisions up to which compaction 
is needed or {@link #NOTHING_TO_COMPACT_INDEX} if nothing
+     * needs to be compacted.
+     *
+     * <p>If the returned index points to the last revision and if the last 
revision is <b>not</b> a tombstone, then the returned index is
+     * decremented by 1.</p>
+     *
+     * @param keyRevisions Metastorage key revisions in ascending order.
+     * @param compactionRevisionInclusive Revision up to which you need to 
compact (inclusive).
+     * @param isTombstone Predicate to test whether a key revision is a 
tombstone.
+     */
+    public static int indexToCompact(long[] keyRevisions, long 
compactionRevisionInclusive, LongPredicate isTombstone) {
+        int i = binarySearch(keyRevisions, compactionRevisionInclusive);
+
+        if (i < 0) {
+            if (i == -1) {
+                return NOTHING_TO_COMPACT_INDEX;
+            }
+
+            i = -(i + 2);
+        }
+
+        if (i == keyRevisions.length - 1 && 
!isTombstone.test(keyRevisions[i])) {
+            i = i == 0 ? NOTHING_TO_COMPACT_INDEX : i - 1;
+        }
+
+        return i;
+    }
+
+    /**
+     * Converts bytes to UTF-8 string.
+     *
+     * @param bytes Bytes.
+     */
+    public static String toUtf8String(byte[] bytes) {
+        return new String(bytes, UTF_8);
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d7a7346d95..ee1fb66078 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.metastorage.server.persistence;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
 import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.appendLong;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToLong;
@@ -44,7 +48,6 @@ import static 
org.apache.ignite.lang.ErrorGroups.MetaStorage.STARTING_STORAGE_ER
 import static org.rocksdb.util.SizeUnit.MB;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -136,13 +139,13 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     /** Revision key. */
     private static final byte[] REVISION_KEY = keyToRocksKey(
             SYSTEM_REVISION_MARKER_VALUE,
-            "SYSTEM_REVISION_KEY".getBytes(StandardCharsets.UTF_8)
+            "SYSTEM_REVISION_KEY".getBytes(UTF_8)
     );
 
     /** Update counter key. */
     private static final byte[] UPDATE_COUNTER_KEY = keyToRocksKey(
             SYSTEM_REVISION_MARKER_VALUE,
-            "SYSTEM_UPDATE_COUNTER_KEY".getBytes(StandardCharsets.UTF_8)
+            "SYSTEM_UPDATE_COUNTER_KEY".getBytes(UTF_8)
     );
 
     /** Lexicographic order comparator. */
@@ -971,23 +974,21 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public void compact(HybridTimestamp lowWatermark) {
-        rwLock.writeLock().lock();
+    public void compact(long revision) {
+        assert revision >= 0;
 
+        rwLock.writeLock().lock();
 
         try (WriteBatch batch = new WriteBatch()) {
-            // Find a revision with timestamp lesser or equal to the watermark.
-            long maxRevision = revisionByTimestamp(lowWatermark);
-
-            if (maxRevision == -1) {
-                // Nothing to compact yet.
-                return;
-            }
+            assert revision < rev : String.format(
+                    "Compaction revision should be less than the current: 
[compaction=%s, current=%s]",
+                    revision, rev
+            );
 
             try (RocksIterator iterator = index.newIterator()) {
                 iterator.seekToFirst();
 
-                RocksUtils.forEach(iterator, (key, value) -> 
compactForKey(batch, key, getAsLongs(value), maxRevision));
+                RocksUtils.forEach(iterator, (key, value) -> 
compactForKey(batch, key, getAsLongs(value), revision));
             }
 
             fillAndWriteBatch(batch, rev, updCntr, null);
@@ -1026,104 +1027,41 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     /**
-     * Compacts all entries by the given key, removing revision that are no 
longer needed.
-     * Last entry with a revision lesser or equal to the {@code 
minRevisionToKeep} and all consecutive entries will be preserved.
-     * If the first entry to keep is a tombstone, it will be removed.
-     * Example:
-     * <pre>
-     * Example 1:
-     *     put entry1: revision 5
-     *     put entry2: revision 7
-     *
-     *     do compaction: revision 6
-     *
-     *     entry1: exists
-     *     entry2: exists
-     *
-     * Example 2:
-     *     put entry1: revision 5
-     *     put entry2: revision 7
-     *
-     *     do compaction: revision 7
-     *
-     *     entry1: doesn't exist
-     *     entry2: exists
-     * </pre>
+     * Compacts the key, see the documentation of {@link 
KeyValueStorage#compact} for examples.
      *
      * @param batch Write batch.
-     * @param key   Target key.
-     * @param revs  Revisions.
-     * @param minRevisionToKeep Minimum revision that should be kept.
-     * @throws RocksDBException If failed.
+     * @param key Target key.
+     * @param revs Key revisions.
+     * @param compactionRevision Revision up to which (inclusively) the key 
will be compacted.
+     * @throws MetaStorageException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long 
minRevisionToKeep) throws RocksDBException {
-        if (revs.length < 2) {
-            // If we have less than two revisions, there is no point in 
compaction.
-            return;
-        }
-
-        // Index of the first revision we will be keeping in the array of 
revisions.
-        int idxToKeepFrom = 0;
-
-        // Whether there is an entry with the minRevisionToKeep.
-        boolean hasMinRevision = false;
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long 
compactionRevision) {
+        try {
+            int indexToCompact = indexToCompact(revs, compactionRevision, 
revision -> isTombstoneForCompaction(key, revision));
 
-        // Traverse revisions, looking for the first revision that needs to be 
kept.
-        for (long rev : revs) {
-            if (rev >= minRevisionToKeep) {
-                if (rev == minRevisionToKeep) {
-                    hasMinRevision = true;
-                }
-                break;
+            if (NOTHING_TO_COMPACT_INDEX == indexToCompact) {
+                return;
             }
 
-            idxToKeepFrom++;
-        }
-
-        if (!hasMinRevision) {
-            // Minimal revision was not encountered, that mean that we are 
between revisions of a key, so previous revision
-            // must be preserved.
-            idxToKeepFrom--;
-        }
-
-        if (idxToKeepFrom <= 0) {
-            // All revisions are still in use.
-            return;
-        }
-
-        for (int i = 0; i < idxToKeepFrom; i++) {
-            // This revision is not needed anymore, remove data.
-            data.delete(batch, keyToRocksKey(revs[i], key));
-        }
-
-        // Whether we only have last revision (even if it's lesser or equal to 
watermark).
-        boolean onlyLastRevisionLeft = idxToKeepFrom == (revs.length - 1);
-
-        // Get the number of the first revision that will be kept.
-        long rev = onlyLastRevisionLeft ? lastRevision(revs) : 
revs[idxToKeepFrom];
-
-        byte[] rocksKey = keyToRocksKey(rev, key);
-
-        Value value = bytesToValue(data.get(rocksKey));
-
-        if (value.tombstone()) {
-            // The first revision we are going to keep is a tombstone, we may 
delete it.
-            data.delete(batch, rocksKey);
-
-            if (!onlyLastRevisionLeft) {
-                // First revision was a tombstone, but there are other 
revisions, that need to be kept,
-                // so advance index of the first revision we need to keep.
-                idxToKeepFrom++;
+            for (int revisionIndex = 0; revisionIndex <= indexToCompact; 
revisionIndex++) {
+                // This revision is not needed anymore, remove data.
+                data.delete(batch, keyToRocksKey(revs[revisionIndex], key));
             }
-        }
 
-        if (onlyLastRevisionLeft && value.tombstone()) {
-            // We don't have any previous revisions for this entry and the 
single existing is a tombstone,
-            // so we can remove it from index.
-            index.delete(batch, key);
-        } else {
-            // Keeps revisions starting with idxToKeepFrom.
-            index.put(batch, key, longsToBytes(revs, idxToKeepFrom));
+            if (indexToCompact == revs.length - 1) {
+                index.delete(batch, key);
+            } else {
+                index.put(batch, key, longsToBytes(revs, indexToCompact + 1));
+            }
+        } catch (Throwable t) {
+            throw new MetaStorageException(
+                    COMPACTION_ERR,
+                    String.format(
+                            "Error during compaction of key: [KeyBytes=%s, 
keyBytesToUtf8String=%s]",
+                            Arrays.toString(key), toUtf8String(key)
+                    ),
+                    t
+            );
         }
     }
 
@@ -1194,7 +1132,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" + 
revUpperBound + ']';
         assert revUpperBound >= revLowerBound
                 : "Invalid arguments: [revLowerBound=" + revLowerBound + ", 
revUpperBound=" + revUpperBound + ']';
-        // TODO: IGNITE-19782 throw CompactedException if revLowerBound is 
compacted.
 
         long[] revs;
 
@@ -1633,4 +1570,29 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
             rwLock.writeLock().unlock();
         }
     }
+
+    private boolean isTombstone(byte[] key, long revision) throws 
RocksDBException {
+        byte[] rocksKey = keyToRocksKey(revision, key);
+
+        byte[] valueBytes = data.get(rocksKey);
+
+        assert valueBytes != null : "key=" + toUtf8String(key) + ", revision=" 
+ revision;
+
+        return bytesToValue(valueBytes).tombstone();
+    }
+
+    private boolean isTombstoneForCompaction(byte[] key, long revision) {
+        try {
+            return isTombstone(key, revision);
+        } catch (RocksDBException e) {
+            throw new MetaStorageException(
+                    COMPACTION_ERR,
+                    String.format(
+                            "Error getting key value by revision: 
[KeyBytes=%s, keyBytesToUtf8String=%s, revision=%s]",
+                            Arrays.toString(key), toUtf8String(key), revision
+                    ),
+                    e
+            );
+        }
+    }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index 9ccc44caa8..4eef4a8aed 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -17,186 +17,147 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /** Compaction tests. */
 public abstract class AbstractCompactionKeyValueStorageTest extends 
AbstractKeyValueStorageTest {
-    private final HybridClock clock = new HybridClockImpl();
+    private static final byte[] FOO_KEY = fromString("foo");
 
-    @Test
-    public void testCompactionAfterLastRevision() {
-        byte[] key = key(0);
-        byte[] value1 = keyValue(0, 0);
-        byte[] value2 = keyValue(0, 1);
+    private static final byte[] BAR_KEY = fromString("bar");
 
-        storage.put(key, value1, clock.now());
-        storage.put(key, value2, clock.now());
+    private static final byte[] SOME_KEY = fromString("someKey");
 
-        long lastRevision = storage.revision();
+    private static final byte[] SOME_VALUE = fromString("someValue");
 
-        storage.compact(clock.now());
+    private final HybridClock clock = new HybridClockImpl();
 
-        // Latest value, must exist.
-        Entry entry2 = storage.get(key, lastRevision);
-        assertEquals(lastRevision, entry2.revision());
-        assertArrayEquals(value2, entry2.value());
+    @Override
+    @BeforeEach
+    void setUp() {
+        super.setUp();
 
-        // Previous value, must be removed due to compaction.
-        Entry entry1 = storage.get(key, lastRevision - 1);
-        assertTrue(entry1.empty());
-    }
+        storage.putAll(List.of(FOO_KEY, BAR_KEY), List.of(SOME_VALUE, 
SOME_VALUE), clock.now());
+        storage.put(BAR_KEY, SOME_VALUE, clock.now());
+        storage.put(FOO_KEY, SOME_VALUE, clock.now());
+        storage.put(SOME_KEY, SOME_VALUE, clock.now());
 
-    @Test
-    public void testCompactionAfterTombstone() {
-        byte[] key = key(0);
-        byte[] value = keyValue(0, 0);
+        var fooKey = new ByteArray(FOO_KEY);
+        var barKey = new ByteArray(BAR_KEY);
 
-        storage.put(key, value, clock.now());
-        storage.remove(key, clock.now());
+        var iif = new If(
+                new AndCondition(new ExistenceCondition(Type.EXISTS, FOO_KEY), 
new ExistenceCondition(Type.EXISTS, BAR_KEY)),
+                new Statement(ops(put(fooKey, SOME_VALUE), 
remove(barKey)).yield()),
+                new Statement(ops(noop()).yield())
+        );
 
-        long lastRevision = storage.revision();
+        storage.invoke(iif, clock.now(), new 
CommandIdGenerator(UUID::randomUUID).newId());
 
-        storage.compact(clock.now());
+        storage.remove(SOME_KEY, clock.now());
 
-        // Current value, must be removed due to being a tombstone.
-        Entry entry2 = storage.get(key, lastRevision);
-        assertTrue(entry2.empty());
+        // Special revision update to prevent tests from failing.
+        storage.put(fromString("fake"), SOME_VALUE, clock.now());
 
-        // Previous value, must be removed due to compaction.
-        Entry entry1 = storage.get(key, lastRevision - 1);
-        assertTrue(entry1.empty());
+        assertEquals(List.of(1, 3, 5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(1, 2, 5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
     @Test
-    public void testCompactionBetweenMultipleWrites() {
-        byte[] key = key(0);
-        byte[] value1 = keyValue(0, 0);
-        byte[] value2 = keyValue(0, 1);
-        byte[] value3 = keyValue(0, 2);
-        byte[] value4 = keyValue(0, 3);
-
-        storage.put(key, value1, clock.now());
-        storage.put(key, value2, clock.now());
-
-        HybridTimestamp compactTs = clock.now();
-
-        storage.put(key, value3, clock.now());
-        storage.put(key, value4, clock.now());
-
-        long lastRevision = storage.revision();
+    void testCompactRevision1() {
+        storage.compact(1);
 
-        storage.compact(compactTs);
-
-        Entry entry4 = storage.get(key, lastRevision);
-        assertArrayEquals(value4, entry4.value());
-
-        Entry entry3 = storage.get(key, lastRevision - 1);
-        assertArrayEquals(value3, entry3.value());
-
-        Entry entry2 = storage.get(key, lastRevision - 2);
-        assertArrayEquals(value2, entry2.value());
-
-        // Previous value, must be removed due to compaction.
-        Entry entry1 = storage.get(key, lastRevision - 3);
-        assertTrue(entry1.empty());
+        assertEquals(List.of(3, 5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(2, 5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
     @Test
-    public void testCompactionAfterTombstoneRemovesTombstone() {
-        byte[] key = key(0);
-        byte[] value1 = keyValue(0, 0);
-        byte[] value2 = keyValue(0, 1);
-
-        storage.put(key, value1, clock.now());
-
-        storage.remove(key, clock.now());
-
-        HybridTimestamp compactTs = clock.now();
-
-        storage.put(key, value2, clock.now());
+    void testCompactRevision2() {
+        storage.compact(2);
 
-        storage.remove(key, clock.now());
-
-        long lastRevision = storage.revision();
-
-        storage.compact(compactTs);
-
-        // Last operation was remove, so this is a tombstone.
-        Entry entry4 = storage.get(key, lastRevision);
-        assertTrue(entry4.tombstone());
-
-        Entry entry3 = storage.get(key, lastRevision - 1);
-        assertArrayEquals(value2, entry3.value());
-
-        // Previous value, must be removed due to compaction.
-        Entry entry2 = storage.get(key, lastRevision - 2);
-        assertTrue(entry2.empty());
-
-        Entry entry1 = storage.get(key, lastRevision - 3);
-        assertTrue(entry1.empty());
+        assertEquals(List.of(3, 5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
     @Test
-    public void testCompactEmptyStorage() {
-        storage.compact(clock.now());
+    void testCompactRevision3() {
+        storage.compact(3);
+
+        assertEquals(List.of(5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
     }
 
     @Test
-    public void testCompactionBetweenRevisionsOfOneKey() {
-        byte[] key = key(0);
-        byte[] value11 = keyValue(0, 0);
-        byte[] value12 = keyValue(0, 1);
-
-        storage.put(key, value11, clock.now());
+    void testCompactRevision4() {
+        storage.compact(4);
 
-        byte[] key2 = key(1);
-        byte[] value2 = keyValue(1, 0);
-        storage.put(key2, value2, clock.now());
-
-        HybridTimestamp compactTs = clock.now();
-
-        storage.put(key, value12, clock.now());
+        assertEquals(List.of(5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(5), collectRevisions(BAR_KEY));
+        assertEquals(List.of(6), collectRevisions(SOME_KEY));
+    }
 
-        storage.compact(compactTs);
+    @Test
+    void testCompactRevision5() {
+        storage.compact(5);
 
-        // Both keys should exist, as low watermark's revision is higher than 
entry11's, but lesser than entry12's,
-        // this means that entry1 is still needed.
-        Entry entry12 = storage.get(key, storage.revision());
-        assertArrayEquals(value12, entry12.value());
+        assertEquals(List.of(5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(), collectRevisions(BAR_KEY));
+        assertEquals(List.of(6), collectRevisions(SOME_KEY));
+    }
 
-        Entry entry11 = storage.get(key, storage.revision() - 1);
-        assertArrayEquals(value11, entry11.value());
+    @Test
+    void testCompactRevision6() {
+        storage.compact(6);
 
-        Entry entry2 = storage.get(key2, storage.revision());
-        assertArrayEquals(value2, entry2.value());
+        assertEquals(List.of(5), collectRevisions(FOO_KEY));
+        assertEquals(List.of(), collectRevisions(BAR_KEY));
+        assertEquals(List.of(), collectRevisions(SOME_KEY));
     }
 
     @Test
-    public void testInvokeCompactionBeforeAnyEntry() {
-        byte[] key = key(0);
-        byte[] value1 = keyValue(0, 0);
-        byte[] value2 = keyValue(0, 1);
+    void testCompactRevisionSequentially() {
+        testCompactRevision1();
+        testCompactRevision2();
+        testCompactRevision3();
+        testCompactRevision4();
+        testCompactRevision5();
+        testCompactRevision6();
+    }
 
-        HybridTimestamp compactTs = clock.now();
+    private List<Integer> collectRevisions(byte[] key) {
+        var revisions = new ArrayList<Integer>();
 
-        storage.put(key, value1, clock.now());
-        storage.put(key, value2, clock.now());
+        for (int revision = 0; revision <= storage.revision(); revision++) {
+            Entry entry = storage.get(key, revision);
 
-        storage.compact(compactTs);
+            if (!entry.empty() && entry.revision() == revision) {
+                revisions.add(revision);
+            }
+        }
 
-        // No entry should be compacted.
-        Entry entry2 = storage.get(key, storage.revision());
-        assertArrayEquals(value2, entry2.value());
+        return revisions;
+    }
 
-        Entry entry1 = storage.get(key, storage.revision() - 1);
-        assertArrayEquals(value1, entry1.value());
+    private static byte[] fromString(String s) {
+        return s.getBytes(UTF_8);
     }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtilsTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtilsTest.java
new file mode 100644
index 0000000000..241aeb9fbd
--- /dev/null
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtilsTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
+import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+/** For {@link KeyValueStorageUtils} testing. */
+public class KeyValueStorageUtilsTest {
+    @Test
+    void testIndexToCompactNoRevisions() {
+        assertEquals(NOTHING_TO_COMPACT_INDEX, 
indexToCompact(LONG_EMPTY_ARRAY, 0, revision -> false));
+        assertEquals(NOTHING_TO_COMPACT_INDEX, 
indexToCompact(LONG_EMPTY_ARRAY, 0, revision -> true));
+    }
+
+    @Test
+    void testIndexToCompactSingleRevision() {
+        long[] keyRevisions = {2};
+
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1, 
revision -> false));
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1, 
revision -> true));
+
+        assertEquals(0, indexToCompact(keyRevisions, 2, revision -> true));
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 2, 
revision -> false));
+
+        assertEquals(0, indexToCompact(keyRevisions, 3, revision -> true));
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 3, 
revision -> false));
+    }
+
+    @Test
+    void testIndexToCompactMultipleRevisions() {
+        long[] keyRevisions = {2, 4, 5};
+
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1, 
revision -> true));
+        assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1, 
revision -> false));
+
+        assertEquals(0, indexToCompact(keyRevisions, 2, revision -> true));
+        assertEquals(0, indexToCompact(keyRevisions, 2, revision -> false));
+
+        assertEquals(0, indexToCompact(keyRevisions, 3, revision -> true));
+        assertEquals(0, indexToCompact(keyRevisions, 3, revision -> false));
+
+        assertEquals(1, indexToCompact(keyRevisions, 4, revision -> true));
+        assertEquals(1, indexToCompact(keyRevisions, 4, revision -> false));
+
+        assertEquals(2, indexToCompact(keyRevisions, 5, revision -> true));
+        assertEquals(1, indexToCompact(keyRevisions, 5, revision -> false));
+
+        assertEquals(2, indexToCompact(keyRevisions, 6, revision -> true));
+        assertEquals(1, indexToCompact(keyRevisions, 6, revision -> false));
+    }
+
+    @Test
+    void testToUtf8String() {
+        assertEquals("foo", toUtf8String("foo".getBytes(UTF_8)));
+    }
+}
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index 73160d3721..5abbcd0104 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -30,7 +30,7 @@ public abstract class AbstractKeyValueStorageTest extends 
BaseIgniteAbstractTest
     protected KeyValueStorage storage;
 
     @BeforeEach
-    public void setUp() {
+    void setUp() {
         storage = createStorage();
 
         storage.start();
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index e1fe414002..0cace48eaa 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -19,9 +19,13 @@ package org.apache.ignite.internal.metastorage.server;
 
 import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
 import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
 import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static org.apache.ignite.lang.ErrorGroups.MetaStorage.OP_EXECUTION_ERR;
 
@@ -40,6 +44,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.LongConsumer;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
@@ -65,8 +70,14 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     /** Lexicographical comparator. */
     private static final Comparator<byte[]> CMP = Arrays::compareUnsigned;
 
-    /** Keys index. Value is the list of all revisions under which entry 
corresponding to the key was modified. */
-    private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
+    /**
+     * Keys index. Value is the list of all revisions under which entry 
corresponding to the key was modified.
+     *
+     * <p>Concurrent map to avoid {@link 
java.util.ConcurrentModificationException} on compaction.</p>
+     *
+     * <p>Guarded by {@link #mux}.</p>
+     */
+    private final NavigableMap<byte[], List<Long>> keysIdx = new 
ConcurrentSkipListMap<>(CMP);
 
     /** Timestamp to revision mapping. */
     private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
@@ -74,8 +85,14 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     /** Revision to timestamp mapping. */
     private final Map<Long, HybridTimestamp> revToTsMap = new HashMap<>();
 
-    /** Revisions index. Value contains all entries which were modified under 
particular revision. */
-    private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new 
TreeMap<>();
+    /**
+     * Revisions index. Value contains all entries which were modified under 
particular revision.
+     *
+     * <p>Concurrent map to avoid {@link 
java.util.ConcurrentModificationException} on compaction.</p>
+     *
+     * <p>Guarded by {@link #mux}.</p>
+     */
+    private final NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = 
new ConcurrentSkipListMap<>();
 
     /** Revision. Will be incremented for each single-entry or multi-entry 
update operation. */
     private long rev;
@@ -499,23 +516,16 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public void compact(HybridTimestamp lowWatermark) {
-        synchronized (mux) {
-            NavigableMap<byte[], List<Long>> compactedKeysIdx = new 
TreeMap<>(CMP);
+    public void compact(long revision) {
+        assert revision >= 0;
 
-            NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = 
new TreeMap<>();
-
-            long maxRevision = revisionByTimestamp(lowWatermark);
-
-            if (maxRevision == -1) {
-                return;
-            }
-
-            keysIdx.forEach((key, revs) -> compactForKey(key, revs, 
compactedKeysIdx, compactedRevsIdx, maxRevision));
-
-            keysIdx = compactedKeysIdx;
+        synchronized (mux) {
+            assert revision < rev : String.format(
+                    "Compaction revision should be less than the current: 
[compaction=%s, current=%s]",
+                    revision, rev
+            );
 
-            revsIdx = compactedRevsIdx;
+            keysIdx.forEach((key, revs) -> compactForKey(key, 
toLongArray(revs), revision));
         }
     }
 
@@ -547,78 +557,42 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     }
 
     /**
-     * Compacts all entries by the given key, removing revision that are no 
longer needed.
-     * Last entry with a revision lesser or equal to the {@code 
minRevisionToKeep} and all consecutive entries will be preserved.
-     * If the first entry to keep is a tombstone, it will be removed.
+     * Compacts the key, see the documentation of {@link 
KeyValueStorage#compact} for examples.
      *
-     * @param key A key.
-     * @param revs All revisions of a key.
-     * @param compactedKeysIdx Out parameter, revisions that need to be kept 
must be put here.
-     * @param compactedRevsIdx Out parameter, values that need to be kept must 
be put here.
-     * @param minRevisionToKeep Minimum revision that should be kept.
+     * @param key Target key.
+     * @param revs Key revisions.
+     * @param compactionRevision Revision up to which (inclusively) the key 
will be compacted.
+     * @throws MetaStorageException If failed.
      */
-    private void compactForKey(
-            byte[] key,
-            List<Long> revs,
-            Map<byte[], List<Long>> compactedKeysIdx,
-            Map<Long, NavigableMap<byte[], Value>> compactedRevsIdx,
-            long minRevisionToKeep
-    ) {
-        List<Long> revsToKeep = new ArrayList<>();
+    private void compactForKey(byte[] key, long[] revs, long 
compactionRevision) {
+        int indexToCompact = indexToCompact(revs, compactionRevision, revision 
-> isTombstoneForCompaction(key, revision));
 
-        // Index of the first revision we will be keeping in the array of 
revisions.
-        int idxToKeepFrom = 0;
+        if (indexToCompact == NOTHING_TO_COMPACT_INDEX) {
+            return;
+        }
 
-        // Whether there is an entry with the minRevisionToKeep.
-        boolean hasMinRevision = false;
+        // Let's deal with the key revisions.
+        if (indexToCompact == revs.length - 1) {
+            keysIdx.remove(key);
+        } else {
+            List<Long> keyRevisions = keysIdx.get(key);
 
-        // Traverse revisions, looking for the first revision that needs to be 
kept.
-        for (long rev : revs) {
-            if (rev >= minRevisionToKeep) {
-                if (rev == minRevisionToKeep) {
-                    hasMinRevision = true;
-                }
-                break;
-            }
+            assert keyRevisions != null : toUtf8String(key);
 
-            idxToKeepFrom++;
+            keyRevisions.subList(0, indexToCompact + 1).clear();
         }
 
-        if (!hasMinRevision) {
-            // Minimal revision was not encountered, that mean that we are 
between revisions of a key, so previous revision
-            // must be preserved.
-            idxToKeepFrom--;
-        }
+        // Let's deal with the key values.
+        for (int revisionIndex = 0; revisionIndex < indexToCompact; 
revisionIndex++) {
+            long revision = revs[revisionIndex];
 
-        for (int i = idxToKeepFrom; i < revs.size(); i++) {
-            long rev = revs.get(i);
-
-            // If this revision is higher than max revision or is the last 
revision, we may need to keep it.
-            NavigableMap<byte[], Value> kv = revsIdx.get(rev);
+            NavigableMap<byte[], Value> valueByKey = revsIdx.get(revision);
 
-            Value value = kv.get(key);
+            valueByKey.remove(key);
 
-            if (i == idxToKeepFrom) {
-                // Check if a first entry to keep is a tombstone.
-                if (value.tombstone()) {
-                    // If this is a first revision we are keeping and it is a 
tombstone, then don't keep it.
-                    continue;
-                }
+            if (valueByKey.isEmpty()) {
+                revsIdx.remove(revision);
             }
-
-            NavigableMap<byte[], Value> compactedKv = 
compactedRevsIdx.computeIfAbsent(
-                    rev,
-                    k -> new TreeMap<>(CMP)
-            );
-
-            // Keep the entry and the revision.
-            compactedKv.put(key, value);
-
-            revsToKeep.add(rev);
-        }
-
-        if (!revsToKeep.isEmpty()) {
-            compactedKeysIdx.put(key, revsToKeep);
         }
     }
 
@@ -660,7 +634,6 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" + 
revUpperBound + ']';
         assert revUpperBound >= revLowerBound
                 : "Invalid arguments: [revLowerBound=" + revLowerBound + ", 
revUpperBound=" + revUpperBound + ']';
-        // TODO: IGNITE-19782 throw CompactedException if revLowerBound is 
compacted.
 
         List<Long> revs = keysIdx.get(key);
 
@@ -861,4 +834,28 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
             watchProcessor.advanceSafeTime(newSafeTime);
         }
     }
+
+    private static long[] toLongArray(List<Long> list) {
+        if (list.isEmpty()) {
+            return LONG_EMPTY_ARRAY;
+        }
+
+        var array = new long[list.size()];
+
+        for (int i = 0; i < array.length; i++) {
+            array[i] = list.get(i);
+        }
+
+        return array;
+    }
+
+    private boolean isTombstoneForCompaction(byte[] key, long revision) {
+        NavigableMap<byte[], Value> kv = revsIdx.get(revision);
+
+        Value value = kv.get(key);
+
+        assert value != null : "key=" + toUtf8String(key) + ", revision=" + 
revision;
+
+        return value.tombstone();
+    }
 }
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index 0f1706c64a..479297a33b 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -112,6 +112,7 @@ enum class code : underlying_t {
     COMPACTION = 0x50003,
     OP_EXECUTION = 0x50004,
     OP_EXECUTION_TIMEOUT = 0x50005,
+    COMPACTED = 0x50006,
 
     // Index group. Group code: 6
     INVALID_INDEX_DEFINITION = 0x60001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 4d767591d4..e4feab83ec 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -181,6 +181,8 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::RESTORING_STORAGE:
         case error::code::COMPACTION:
             return sql_state::SHY000_GENERAL_ERROR;
+        case error::code::COMPACTED:
+            return sql_state::SHY000_GENERAL_ERROR;
 
         // Index group. Group code: 6
         case error::code::INDEX_NOT_FOUND:
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index f5e10723d9..c3f7c7fb09 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -229,6 +229,9 @@ namespace Apache.Ignite
 
             /// <summary> OpExecutionTimeout error. </summary>
             public const int OpExecutionTimeout = (GroupCode << 16) | (5 & 
0xFFFF);
+
+            /// <summary> Compacted error. </summary>
+            public const int Compacted = (GroupCode << 16) | (6 & 0xFFFF);
         }
 
         /// <summary> Index errors. </summary>

Reply via email to