This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 39224e3dfa IGNITE-18020 Add GC support to rocksdb storage (#1619)
39224e3dfa is described below

commit 39224e3dfa464f11d7c7cacb740be5cf299be4ac
Author: Semyon Danilov <[email protected]>
AuthorDate: Mon Feb 6 19:02:41 2023 +0400

    IGNITE-18020 Add GC support to rocksdb storage (#1619)
---
 gradle/libs.versions.toml                          |   2 +-
 modules/network/README.md                          |   6 +-
 .../network/{docs => tech-notes}/network-flow.png  | Bin
 .../network/{docs => tech-notes}/network-flow.puml |   0
 .../network/{docs => tech-notes}/threading-2.png   | Bin
 .../network/{docs => tech-notes}/threading-2.puml  |   0
 modules/network/{docs => tech-notes}/threading.png | Bin
 .../network/{docs => tech-notes}/threading.puml    |   0
 .../internal/storage/MvPartitionStorage.java       |   1 +
 .../ignite/internal/storage/TableRowAndRowId.java  |   2 +-
 .../AbstractMvPartitionStorageConcurrencyTest.java |  36 +--
 .../storage/AbstractMvPartitionStorageGcTest.java  |  14 +-
 .../storage/AbstractMvPartitionStorageTest.java    |   9 +
 .../internal/storage/BaseMvStoragesTest.java       |  49 +--
 .../storage/rocksdb/ColumnFamilyUtils.java         |  27 +-
 .../internal/storage/rocksdb/GarbageCollector.java | 320 ++++++++++++++++++++
 .../storage/rocksdb/PartitionDataHelper.java       | 212 +++++++++++++
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 330 ++++++++++-----------
 .../storage/rocksdb/RocksDbTableStorage.java       |  31 +-
 .../rocksdb/RocksDbMvPartitionStorageGcTest.java   |  44 +++
 .../tech-notes/garbage-collection.md               |  95 ++++++
 21 files changed, 913 insertions(+), 265 deletions(-)

diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index de9d67aa59..ddc93a2355 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -57,7 +57,7 @@ jsonpath = "2.4.0"
 classgraph = "4.8.110"
 javassist = "3.28.0-GA"
 checker = "3.10.0"
-rocksdb = "7.3.1"
+rocksdb = "7.9.2"
 disruptor = "3.3.7"
 metrics = "4.0.2"
 jctools = "3.3.0"
diff --git a/modules/network/README.md b/modules/network/README.md
index 4aac91b3b0..12645de7b4 100644
--- a/modules/network/README.md
+++ b/modules/network/README.md
@@ -29,9 +29,9 @@ Messages are then passed on to message listeners of the 
ConnectionManager.
 In case of ClusterService over ScaleCube (see 
`ScaleCubeClusterServiceFactory`),
 messages are passed down to the ClusterService via the Project Reactor's Sink 
which enforces a strict order of message handling:
 a new message can't be received by ClusterService until a previous message is 
**handled** (see [message handling](#message-handling)).
-![Threading](docs/threading.png)
+![Threading](tech-notes/threading.png)
 Message handling can also be offloaded to another thread:
-![Threading](docs/threading-2.png)
+![Threading](tech-notes/threading-2.png)
 Note that in this case the network message would be considered **handled** 
before it is processed  
 by another thread.
 
@@ -46,4 +46,4 @@ Message is considered **handled** after all the message 
handlers have been invok
 ## Message's flow example
 Two nodes, Alice and Bob.
 User is sending a message from Alice to Bob within any thread.
-![Network flow between two nodes](docs/network-flow.png)
+![Network flow between two nodes](tech-notes/network-flow.png)
diff --git a/modules/network/docs/network-flow.png 
b/modules/network/tech-notes/network-flow.png
similarity index 100%
rename from modules/network/docs/network-flow.png
rename to modules/network/tech-notes/network-flow.png
diff --git a/modules/network/docs/network-flow.puml 
b/modules/network/tech-notes/network-flow.puml
similarity index 100%
rename from modules/network/docs/network-flow.puml
rename to modules/network/tech-notes/network-flow.puml
diff --git a/modules/network/docs/threading-2.png 
b/modules/network/tech-notes/threading-2.png
similarity index 100%
rename from modules/network/docs/threading-2.png
rename to modules/network/tech-notes/threading-2.png
diff --git a/modules/network/docs/threading-2.puml 
b/modules/network/tech-notes/threading-2.puml
similarity index 100%
rename from modules/network/docs/threading-2.puml
rename to modules/network/tech-notes/threading-2.puml
diff --git a/modules/network/docs/threading.png 
b/modules/network/tech-notes/threading.png
similarity index 100%
rename from modules/network/docs/threading.png
rename to modules/network/tech-notes/threading.png
diff --git a/modules/network/docs/threading.puml 
b/modules/network/tech-notes/threading.puml
similarity index 100%
rename from modules/network/docs/threading.puml
rename to modules/network/tech-notes/threading.puml
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index edfbd2283b..40dc2ea961 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -225,6 +225,7 @@ public interface MvPartitionStorage extends 
ManuallyCloseable {
      * @param lowWatermark A time threshold for the row. Rows younger then the 
watermark value will not be removed.
      * @return A pair of table row and row id, where a timestamp of the row is 
less than or equal to {@code lowWatermark}.
      *      {@code null} if there's no such value.
+     * @throws StorageException If failed to poll element for vacuum.
      */
     default @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
         throw new UnsupportedOperationException("pollForVacuum");
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java
index 2dc1d5dae6..52d5b60b3b 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java
@@ -24,7 +24,7 @@ import org.jetbrains.annotations.Nullable;
  * Wrapper that holds both {@link TableRow} and {@link RowId}. {@link 
TableRow} is null for tombstones.
  */
 public class TableRowAndRowId {
-    /** Table row. */
+    /** Table row. {@code null} if tombstone. */
     private final @Nullable TableRow tableRow;
 
     /** Row id. */
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 7e1e8d9d7c..f193c34448 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.storage;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.TableRow;
-import org.apache.ignite.internal.storage.impl.TestStorageEngine;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
@@ -85,9 +83,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testRegularGcAndRead(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             HybridTimestamp firstCommitTs = addAndCommit(TABLE_ROW);
 
@@ -108,9 +103,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             HybridTimestamp firstCommitTs = addAndCommit.perform(this, 
TABLE_ROW);
 
@@ -129,9 +121,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
@@ -153,9 +142,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
@@ -179,9 +165,6 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
     @ParameterizedTest
     @EnumSource(AddAndCommit.class)
     void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-18020
-        assumeTrue(engine instanceof TestStorageEngine);
-
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
@@ -198,6 +181,25 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
         }
     }
 
+    @ParameterizedTest
+    @EnumSource(AddAndCommit.class)
+    void testConcurrentGc(AddAndCommit addAndCommit) {
+        for (int i = 0; i < REPEATS; i++) {
+            addAndCommit.perform(this, TABLE_ROW);
+
+            addAndCommit.perform(this, null);
+
+            runRace(
+                    () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+                    () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+                    () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+                    () -> pollForVacuum(HybridTimestamp.MAX_VALUE)
+            );
+
+            assertNull(storage.closestRowId(ROW_ID));
+        }
+    }
+
     @SuppressWarnings("ResultOfMethodCallIgnored")
     private void scanFirstEntry(HybridTimestamp firstCommitTs) {
         try (var cursor = scan(firstCommitTs)) {
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
index d83fdac549..0f4a9990b4 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
@@ -29,14 +29,14 @@ import org.junit.jupiter.api.Test;
 public abstract class AbstractMvPartitionStorageGcTest extends 
BaseMvPartitionStorageTest {
     @Test
     void testEmptyStorage() {
-        assertNull(storage.pollForVacuum(clock.now()));
+        assertNull(pollForVacuum(clock.now()));
     }
 
     @Test
     void testSingleValueStorage() {
         addAndCommit(TABLE_ROW);
 
-        assertNull(storage.pollForVacuum(clock.now()));
+        assertNull(pollForVacuum(clock.now()));
     }
 
     @Test
@@ -48,13 +48,13 @@ public abstract class AbstractMvPartitionStorageGcTest 
extends BaseMvPartitionSt
         HybridTimestamp secondCommitTs = addAndCommit(TABLE_ROW2);
 
         // Data is still visible for older timestamps.
-        assertNull(storage.pollForVacuum(firstCommitTs));
+        assertNull(pollForVacuum(firstCommitTs));
 
-        assertNull(storage.pollForVacuum(tsBetweenCommits));
+        assertNull(pollForVacuum(tsBetweenCommits));
 
         // Once a low watermark value becomes equal to second commit 
timestamp, previous value
         // becomes completely inaccessible and should be purged.
-        TableRowAndRowId gcedRow = storage.pollForVacuum(secondCommitTs);
+        TableRowAndRowId gcedRow = pollForVacuum(secondCommitTs);
 
         assertNotNull(gcedRow);
 
@@ -72,7 +72,7 @@ public abstract class AbstractMvPartitionStorageGcTest 
extends BaseMvPartitionSt
         addAndCommit(TABLE_ROW);
         HybridTimestamp secondCommitTs = addAndCommit(null);
 
-        TableRowAndRowId row = storage.pollForVacuum(secondCommitTs);
+        TableRowAndRowId row = pollForVacuum(secondCommitTs);
 
         assertNotNull(row);
         assertRowMatches(row.tableRow(), TABLE_ROW);
@@ -89,7 +89,7 @@ public abstract class AbstractMvPartitionStorageGcTest 
extends BaseMvPartitionSt
         addAndCommit(null);
         HybridTimestamp lastCommitTs = addAndCommit(null);
 
-        TableRowAndRowId row = storage.pollForVacuum(lastCommitTs);
+        TableRowAndRowId row = pollForVacuum(lastCommitTs);
 
         assertNotNull(row);
         assertRowMatches(row.tableRow(), TABLE_ROW);
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 8d2c9e7d65..9c8164b22b 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -553,6 +553,15 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
         assertThat(returnedRow, is(nullValue()));
     }
 
+    @Test
+    void addWriteCommittedTombstone() {
+        addWriteCommitted(ROW_ID, tableRow, clock.now());
+        assertRowMatches(read(ROW_ID, HybridTimestamp.MAX_VALUE), tableRow);
+
+        addWriteCommitted(ROW_ID, null, clock.now());
+        assertNull(read(ROW_ID, HybridTimestamp.MAX_VALUE));
+    }
+
     @Test
     void addWriteCreatesUncommittedVersion() {
         RowId rowId = insert(tableRow, txId);
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index b7e83ce61f..d67b793f56 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 import java.util.List;
-import java.util.Locale;
 import java.util.Objects;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -52,58 +51,36 @@ import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
 
 /**
  * Base test for MV storages, contains pojo classes, their descriptor and a 
marshaller instance.
  */
 public abstract class BaseMvStoragesTest {
     /** Default reflection marshaller factory. */
-    protected static MarshallerFactory marshallerFactory;
+    protected static final MarshallerFactory marshallerFactory = new 
ReflectionMarshallerFactory();
 
     /** Schema descriptor for tests. */
-    protected static SchemaDescriptor schemaDescriptor;
+    protected static final SchemaDescriptor schemaDescriptor = new 
SchemaDescriptor(1, new Column[]{
+            new Column("INTKEY", NativeTypes.INT32, false),
+            new Column("STRKEY", NativeTypes.STRING, false),
+    }, new Column[]{
+            new Column("INTVAL", NativeTypes.INT32, false),
+            new Column("STRVAL", NativeTypes.STRING, false),
+    });
 
     /** Key-value marshaller for tests. */
-    protected static KvMarshaller<TestKey, TestValue> kvMarshaller;
+    protected static final KvMarshaller<TestKey, TestValue> kvMarshaller
+            = marshallerFactory.create(schemaDescriptor, TestKey.class, 
TestValue.class);
 
     /** Key-value {@link BinaryTuple} converter for tests. */
-    protected static BinaryConverter kvBinaryConverter;
+    protected static final BinaryConverter kvBinaryConverter = 
BinaryConverter.forRow(schemaDescriptor);
 
     /** Key {@link BinaryTuple} converter for tests. */
-    protected static BinaryConverter kBinaryConverter;
+    protected static final BinaryConverter kBinaryConverter = 
BinaryConverter.forKey(schemaDescriptor);
 
     /** Hybrid clock to generate timestamps. */
     protected final HybridClock clock = new HybridClockImpl();
 
-    @BeforeAll
-    static void beforeAll() {
-        marshallerFactory = new ReflectionMarshallerFactory();
-
-        schemaDescriptor = new SchemaDescriptor(1, new Column[]{
-                new Column("intKey".toUpperCase(Locale.ROOT), 
NativeTypes.INT32, false),
-                new Column("strKey".toUpperCase(Locale.ROOT), 
NativeTypes.STRING, false),
-        }, new Column[]{
-                new Column("intVal".toUpperCase(Locale.ROOT), 
NativeTypes.INT32, false),
-                new Column("strVal".toUpperCase(Locale.ROOT), 
NativeTypes.STRING, false),
-        });
-
-        kvMarshaller = marshallerFactory.create(schemaDescriptor, 
TestKey.class, TestValue.class);
-
-        kvBinaryConverter = BinaryConverter.forRow(schemaDescriptor);
-        kBinaryConverter = BinaryConverter.forKey(schemaDescriptor);
-    }
-
-    @AfterAll
-    static void afterAll() {
-        kvMarshaller = null;
-        schemaDescriptor = null;
-        marshallerFactory = null;
-        kvBinaryConverter = null;
-        kBinaryConverter = null;
-    }
-
     protected static TableRow tableRow(TestKey key, TestValue value) {
         return TableRowConverter.fromBinaryRow(binaryRow(key, value), 
kvBinaryConverter);
     }
@@ -181,7 +158,7 @@ public abstract class BaseMvStoragesTest {
         }
     }
 
-    protected final void assertRowMatches(TableRow rowUnderQuestion, TableRow 
expectedRow) {
+    protected final void assertRowMatches(@Nullable TableRow rowUnderQuestion, 
TableRow expectedRow) {
         assertThat(rowUnderQuestion, is(notNullValue()));
         assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
     }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
index c49a1ebe49..8eb1ab722b 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/ColumnFamilyUtils.java
@@ -25,31 +25,24 @@ import org.rocksdb.RocksDB;
  * Utilities for converting partition IDs and index names into Column Family 
names and vice versa.
  */
 class ColumnFamilyUtils {
-    /**
-     * Name of the meta column family matches default columns family, meaning 
that it always exist when new table is created.
-     */
+    /** Name of the meta column family matches default columns family, meaning 
that it always exist when new table is created. */
     static final String META_CF_NAME = new 
String(RocksDB.DEFAULT_COLUMN_FAMILY, StandardCharsets.UTF_8);
 
-    /**
-     * Name of the Column Family that stores partition data.
-     */
+    /** Name of the Column Family that stores partition data. */
     static final String PARTITION_CF_NAME = "cf-part";
 
-    /**
-     * Name of the Column Family that stores hash index data.
-     */
+    /** Name of the Column Family that stores garbage collection queue. */
+    static final String GC_QUEUE_CF_NAME = "cf-gc";
+
+    /** Name of the Column Family that stores hash index data. */
     static final String HASH_INDEX_CF_NAME = "cf-hash";
 
-    /**
-     * Prefix for SQL indexes column family names.
-     */
+    /** Prefix for SQL indexes column family names. */
     static final String SORTED_INDEX_CF_PREFIX = "cf-sorted-";
 
-    /**
-     * Utility enum to describe a type of the column family - meta or 
partition.
-     */
+    /** Utility enum to describe a type of the column family - meta or 
partition. */
     enum ColumnFamilyType {
-        META, PARTITION, HASH_INDEX, SORTED_INDEX, UNKNOWN;
+        META, PARTITION, GC_QUEUE, HASH_INDEX, SORTED_INDEX, UNKNOWN;
 
         /**
          * Determines column family type by its name.
@@ -62,6 +55,8 @@ class ColumnFamilyUtils {
                 return META;
             } else if (PARTITION_CF_NAME.equals(cfName)) {
                 return PARTITION;
+            } else if (GC_QUEUE_CF_NAME.equals(cfName)) {
+                return GC_QUEUE;
             } else if (HASH_INDEX_CF_NAME.equals(cfName)) {
                 return HASH_INDEX;
             } else if (cfName.startsWith(SORTED_INDEX_CF_PREFIX)) {
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
new file mode 100644
index 0000000000..cf25f55b63
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.KEY_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.PARTITION_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final PartitionDataHelper helper;
+
+    /** RocksDB instance. */
+    private final RocksDB db;
+
+    /** GC queue column family. */
+    private final ColumnFamilyHandle gcQueueCf;
+
+    GarbageCollector(PartitionDataHelper helper, RocksDB db, 
ColumnFamilyHandle gcQueueCf) {
+        this.helper = helper;
+        this.db = db;
+        this.gcQueueCf = gcQueueCf;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        ColumnFamilyHandle partCf = helper.partCf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (RocksIterator it = db.newIterator(partCf, 
helper.upperBoundReadOpts)) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gcQueueCf, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        ColumnFamilyHandle partCf = helper.partCf;
+
+        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
+        // However, the element that we need to garbage collect is the next 
(older one) element.
+        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want 
to garbage collect.
+        try (RocksIterator gcIt = db.newIterator(gcQueueCf, 
helper.upperBoundReadOpts)) {
+            gcIt.seek(helper.partitionStartPrefix());
+
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
+
+            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+            gcKeyBuffer.clear();
+
+            gcIt.key(gcKeyBuffer);
+
+            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
+
+            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+            // Delete element from the GC queue.
+            batch.delete(gcQueueCf, gcKeyBuffer);
+
+            try (RocksIterator it = db.newIterator(partCf, 
helper.upperBoundReadOpts)) {
+                // Process the element in data cf that triggered the addition 
to the GC queue.
+                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, gcElementRowId, gcElementTimestamp);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // Find the row that should be garbage collected.
+                ByteBuffer dataKey = getRowForGcKey(it, gcElementRowId);
+
+                if (dataKey == null) {
+                    // No row for GC.
+                    return null;
+                }
+
+                // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
+                byte[] valueBytes = it.value();
+
+                var row = new 
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                TableRowAndRowId retVal = new TableRowAndRowId(row, 
gcElementRowId);
+
+                // Delete the row from the data cf.
+                batch.delete(partCf, dataKey);
+
+                return retVal;
+            }
+        }
+    }
+
+    /**
+     * Processes the entry that triggered adding row id to garbage collector's 
queue.
+     * <br>
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     * If there is no row in the data column family, returns {@code false} as 
no further processing is required.
+     * if there is a row and this entry is a tombstone, removes tombstone.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param batch Write batch.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it, 
WriteBatchWithIndex batch, RowId gcElementRowId,
+            HybridTimestamp gcElementTimestamp) throws RocksDBException {
+        ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+        dataKeyBuffer.clear();
+
+        ColumnFamilyHandle partCf = helper.partCf;
+
+        // Set up the data key.
+        helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+        // Seek to the row id and timestamp from the GC queue.
+        // Note that it doesn't mean that the element in this iterator has 
matching row id or even partition id.
+        it.seek(dataKeyBuffer);
+
+        if (invalid(it)) {
+            // There is no row for the GC queue element.
+            return false;
+        } else {
+            dataKeyBuffer.clear();
+
+            it.key(dataKeyBuffer);
+
+            if (!helper.getRowId(dataKeyBuffer, 
ROW_ID_OFFSET).equals(gcElementRowId)) {
+                // There is no row for the GC queue element.
+                return false;
+            }
+        }
+
+        // Check if the new element, whose insertion scheduled the GC, was a 
tombstone.
+        int len = it.value(EMPTY_DIRECT_BUFFER);
+
+        if (len == 0) {
+            // This is a tombstone, we need to delete it.
+            batch.delete(partCf, dataKeyBuffer);
+        }
+
+        return true;
+    }
+
+    /**
+     * Checks if there is a row for garbage collection and returns this row's 
key if it exists.
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return Key of the row that needs to be garbage collected, or {@code 
null} if such row doesn't exist.
+     */
+    private @Nullable ByteBuffer getRowForGcKey(RocksIterator it, RowId 
gcElementRowId) {
+        // Let's move to the element that was scheduled for GC.
+        it.next();
+
+        RowId gcRowId;
+
+        if (invalid(it)) {
+            return null;
+        }
+
+        ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+        dataKeyBuffer.clear();
+
+        int keyLen = it.key(dataKeyBuffer);
+
+        // Check if we moved to another row id's write-intent, that would mean 
that there is no row to GC for the current row id.
+        if (keyLen == MAX_KEY_SIZE) {
+            gcRowId = helper.getRowId(dataKeyBuffer, ROW_ID_OFFSET);
+
+            // We might have moved to the next row id.
+            if (gcElementRowId.equals(gcRowId)) {
+                return dataKeyBuffer;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Deletes garbage collector's queue.
+     *
+     * @param writeBatch Write batch.
+     * @throws RocksDBException If failed to delete the queue.
+     */
+    void deleteQueue(WriteBatch writeBatch) throws RocksDBException {
+        writeBatch.deleteRange(gcQueueCf, helper.partitionStartPrefix(), 
helper.partitionEndPrefix());
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
new file mode 100644
index 0000000000..00562bcd07
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.Slice;
+
+/** Helper for the partition data. */
+class PartitionDataHelper implements ManuallyCloseable {
+    /** Commit partition id size. */
+    static final int PARTITION_ID_SIZE = Short.BYTES;
+
+    /** UUID size in bytes. */
+    static final int ROW_ID_SIZE = 2 * Long.BYTES;
+
+    /** Position of row id inside the key. */
+    static final int ROW_ID_OFFSET = Short.BYTES;
+
+    /** Size of the key without timestamp. */
+    static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Maximum size of the data key. */
+    static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
+
+    /** Transaction id size (part of the transaction state). */
+    private static final int TX_ID_SIZE = 2 * Long.BYTES;
+
+    /** Commit table id size (part of the transaction state). */
+    private static final int TABLE_ID_SIZE = 2 * Long.BYTES;
+
+    /** Size of the value header (transaction state). */
+    static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + 
PARTITION_ID_SIZE;
+
+    /** Transaction id offset. */
+    static final int TX_ID_OFFSET = 0;
+
+    /** Commit table id offset. */
+    static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+
+    /** Commit partition id offset. */
+    static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+
+    /** Value offset (if transaction state is present). */
+    static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+
+    static final ByteOrder TABLE_ROW_BYTE_ORDER = TableRow.ORDER;
+
+    static final ByteOrder KEY_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> 
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Partition id. */
+    private final int partitionId;
+
+    /** Upper bound for scans. */
+    private final Slice upperBound;
+
+    /** Partition data column family. */
+    final ColumnFamilyHandle partCf;
+
+    /** Read options for regular scans. */
+    final ReadOptions upperBoundReadOpts;
+
+    /** Read options for total order scans. */
+    final ReadOptions scanReadOpts;
+
+    PartitionDataHelper(int partitionId, ColumnFamilyHandle partCf) {
+        this.partitionId = partitionId;
+        this.partCf = partCf;
+
+        this.upperBound = new Slice(partitionEndPrefix());
+        this.upperBoundReadOpts = new 
ReadOptions().setIterateUpperBound(upperBound);
+        this.scanReadOpts = new 
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+    }
+
+    /**
+     * Creates a prefix of all keys in the given partition.
+     */
+    byte[] partitionStartPrefix() {
+        return unsignedShortAsBytes(partitionId);
+    }
+
+    /**
+     * Creates a prefix of all keys in the next partition, used as an 
exclusive bound.
+     */
+    byte[] partitionEndPrefix() {
+        return unsignedShortAsBytes(partitionId + 1);
+    }
+
+    void putDataKey(ByteBuffer dataKeyBuffer, RowId rowId, HybridTimestamp 
timestamp) {
+        dataKeyBuffer.putShort((short) partitionId);
+        putRowId(dataKeyBuffer, rowId);
+        putTimestampDesc(dataKeyBuffer, timestamp);
+
+        dataKeyBuffer.flip();
+    }
+
+    void putGcKey(ByteBuffer gcKeyBuffer, RowId rowId, HybridTimestamp 
timestamp) {
+        gcKeyBuffer.putShort((short) partitionId);
+        putTimestampNatural(gcKeyBuffer, timestamp);
+        putRowId(gcKeyBuffer, rowId);
+
+        gcKeyBuffer.flip();
+    }
+
+    void putRowId(ByteBuffer keyBuffer, RowId rowId) {
+        assert rowId.partitionId() == partitionId : rowId;
+        assert keyBuffer.order() == KEY_BYTE_ORDER;
+
+        keyBuffer.putLong(normalize(rowId.mostSignificantBits()));
+        keyBuffer.putLong(normalize(rowId.leastSignificantBits()));
+    }
+
+    RowId getRowId(ByteBuffer keyBuffer, int offset) {
+        assert partitionId == (keyBuffer.getShort(0) & 0xFFFF);
+
+        return new RowId(partitionId, normalize(keyBuffer.getLong(offset)), 
normalize(keyBuffer.getLong(offset + Long.BYTES)));
+    }
+
+    /**
+     * Converts signed long into a new long value, that when written in Big 
Endian, will preserve the comparison order if compared
+     * lexicographically as an array of unsigned bytes. For example, values 
{@code -1} and {@code 0}, when written in BE, will become
+     * {@code 0xFF..F} and {@code 0x00..0}, and lose their ascending order.
+     *
+     * <p>Flipping the sign bit will change the situation: {@code -1 -> 
0x7F..F} and {@code 0 -> 0x80..0}.
+     */
+    static long normalize(long value) {
+        return value ^ (1L << 63);
+    }
+
+    /**
+     * Stores unsigned short (represented by int) in the byte array.
+     *
+     * @param value Unsigned short value.
+     * @return Byte array with unsigned short.
+     */
+    private static byte[] unsignedShortAsBytes(int value) {
+        return new byte[] {(byte) (value >>> 8), (byte) value};
+    }
+
+    /**
+     * Writes a timestamp into a byte buffer, in descending lexicographical 
bytes order.
+     */
+    static void putTimestampDesc(ByteBuffer buf, HybridTimestamp ts) {
+        assert buf.order() == KEY_BYTE_ORDER;
+
+        // "bitwise negation" turns ascending order into a descending one.
+        buf.putLong(~ts.getPhysical());
+        buf.putInt(~ts.getLogical());
+    }
+
+    static HybridTimestamp readTimestampDesc(ByteBuffer keyBuf) {
+        assert keyBuf.order() == KEY_BYTE_ORDER;
+
+        long physical = ~keyBuf.getLong(ROW_PREFIX_SIZE);
+        int logical = ~keyBuf.getInt(ROW_PREFIX_SIZE + Long.BYTES);
+
+        return new HybridTimestamp(physical, logical);
+    }
+
+    /**
+     * Writes a timestamp into a byte buffer, in ascending lexicographical 
bytes order.
+     */
+    static void putTimestampNatural(ByteBuffer buf, HybridTimestamp ts) {
+        assert buf.order() == KEY_BYTE_ORDER;
+
+        buf.putLong(ts.getPhysical());
+        buf.putInt(ts.getLogical());
+    }
+
+    static HybridTimestamp readTimestampNatural(ByteBuffer keyBuf, int offset) 
{
+        assert keyBuf.order() == KEY_BYTE_ORDER;
+
+        long physical = keyBuf.getLong(offset);
+        int logical = keyBuf.getInt(offset + Long.BYTES);
+
+        return new HybridTimestamp(physical, logical);
+    }
+
+    @Override
+    public void close() {
+        RocksUtils.closeAll(upperBoundReadOpts, upperBound);
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4507794523..b4c84a3a6b 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -18,10 +18,23 @@
 package org.apache.ignite.internal.storage.rocksdb;
 
 import static java.lang.ThreadLocal.withInitial;
-import static java.nio.ByteBuffer.allocateDirect;
+import static java.nio.ByteBuffer.allocate;
 import static java.util.Arrays.copyOf;
 import static java.util.Arrays.copyOfRange;
-import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.KEY_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.PARTITION_ID_OFFSET;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_PREFIX_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ID_OFFSET;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TX_ID_OFFSET;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_HEADER_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.VALUE_OFFSET;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.normalize;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.putTimestampDesc;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampDesc;
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.partitionIdKey;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
@@ -34,7 +47,6 @@ import static 
org.apache.ignite.internal.util.ByteUtils.putUuidToBytes;
 import static org.rocksdb.ReadTier.PERSISTED_TIER;
 
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -53,8 +65,10 @@ import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.storage.util.StorageState;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.GridUnsafe;
@@ -75,79 +89,35 @@ import org.rocksdb.WriteOptions;
 /**
  * Multi-versioned partition storage implementation based on RocksDB. Stored 
data has the following format.
  *
- * <p/>Key:
- * <pre><code>
+ * <p>Key:
+ * <pre>{@code
  * For write-intents
  * | partId (2 bytes, BE) | rowId (16 bytes, BE) |
  *
  * For committed rows
  * | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (12 bytes, DESC) |
- * </code></pre>
+ * }</pre>
  * Value:
- * <pre><code>
+ * <pre>{@code
  * For write-intents
  * | txId (16 bytes) | commitTableId (16 bytes) | commitPartitionId (2 bytes) 
| Row data |
  *
  * For committed rows
  * | Row data |
- * </code></pre>
+ * }</pre>
  *
- * <p/>Pending transactions (write-intents) data doesn't have a timestamp 
assigned, but they have transaction
+ * <p>Pending transactions (write-intents) data doesn't have a timestamp 
assigned, but they have transaction
  * state (txId, commitTableId and commitPartitionId).
  *
- * <p/>BE means Big Endian, meaning that lexicographical bytes order matches a 
natural order of partitions.
+ * <p>BE means Big Endian, meaning that lexicographical bytes order matches a 
natural order of partitions.
  *
- * <p/>DESC means that timestamps are sorted from newest to oldest (N2O). 
Please refer to {@link #putTimestamp(ByteBuffer, HybridTimestamp)}
+ * <p>DESC means that timestamps are sorted from newest to oldest (N2O).
+ * Please refer to {@link PartitionDataHelper#putTimestampDesc(ByteBuffer, 
HybridTimestamp)}
  * to see how it's achieved. Missing timestamp could be interpreted as a 
moment infinitely far away in the future.
  */
 public class RocksDbMvPartitionStorage implements MvPartitionStorage {
-    /** Position of row id inside the key. */
-    private static final int ROW_ID_OFFSET = Short.BYTES;
-
-    /** UUID size in bytes. */
-    private static final int ROW_ID_SIZE = 2 * Long.BYTES;
-
-    /** Size of the key without timestamp. */
-    private static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
-
-    /** Transaction id size (part of the transaction state). */
-    private static final int TX_ID_SIZE = 2 * Long.BYTES;
-
-    /** Commit table id size (part of the transaction state). */
-    private static final int TABLE_ID_SIZE = 2 * Long.BYTES;
-
-    /** Commit partition id size (part of the transaction state). */
-    private static final int PARTITION_ID_SIZE = Short.BYTES;
-
-    /** Size of the value header (transaction state). */
-    private static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + 
PARTITION_ID_SIZE;
-
-    /** Transaction id offset. */
-    private static final int TX_ID_OFFSET = 0;
-
-    /** Commit table id offset. */
-    private static final int TABLE_ID_OFFSET = TX_ID_SIZE;
-
-    /** Commit partition id offset. */
-    private static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + 
TABLE_ID_SIZE;
-
-    /** Value offset (if transaction state is present). */
-    private static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
-
-    /** Maximum size of the key. */
-    private static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + 
HYBRID_TIMESTAMP_SIZE;
-
-    private static final ByteOrder KEY_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
-
-    private static final ByteOrder TABLE_ROW_BYTE_ORDER = TableRow.ORDER;
-
-    /** Thread-local direct buffer instance to read keys from RocksDB. */
-    private static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = 
withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
-
     /** Thread-local on-heap byte buffer instance to use for key 
manipulations. */
-    private static final ThreadLocal<ByteBuffer> HEAP_KEY_BUFFER = withInitial(
-            () -> ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER)
-    );
+    private static final ThreadLocal<ByteBuffer> HEAP_KEY_BUFFER = 
withInitial(() -> allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
 
     /** Thread-local write batch for {@link #runConsistently(WriteClosure)}. */
     private final ThreadLocal<WriteBatchWithIndex> threadLocalWriteBatch = new 
ThreadLocal<>();
@@ -165,8 +135,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** RocksDb instance. */
     private final RocksDB db;
 
-    /** Partitions column family. */
-    private final ColumnFamilyHandle cf;
+    /** Helper for the rocksdb partition. */
+    private final PartitionDataHelper helper;
+
+    /** Garbage collector. */
+    private final GarbageCollector gc;
 
     /** Meta column family. */
     private final ColumnFamilyHandle meta;
@@ -180,14 +153,6 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** Read options for reading persisted data. */
     private final ReadOptions persistedTierReadOpts = new 
ReadOptions().setReadTier(PERSISTED_TIER);
 
-    /** Upper bound for scans and reads. */
-    private final Slice upperBound;
-
-    private final ReadOptions upperBoundReadOpts;
-
-    /** Read options for scan iterators. */
-    private final ReadOptions scanReadOptions;
-
     /** Key to store applied index value in meta. */
     private final byte[] lastAppliedIndexKey;
 
@@ -233,13 +198,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         this.tableStorage = tableStorage;
         this.partitionId = partitionId;
         db = tableStorage.db();
-        cf = tableStorage.partitionCfHandle();
         meta = tableStorage.metaCfHandle();
-
-        upperBound = new Slice(partitionEndPrefix());
-
-        upperBoundReadOpts = new 
ReadOptions().setIterateUpperBound(upperBound);
-        scanReadOptions = new 
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+        helper = new PartitionDataHelper(partitionId, 
tableStorage.partitionCfHandle());
+        gc = new GarbageCollector(helper, db, tableStorage.gcQueueHandle());
 
         lastAppliedIndexKey = ("index" + 
partitionId).getBytes(StandardCharsets.UTF_8);
         lastAppliedTermKey = ("term" + 
partitionId).getBytes(StandardCharsets.UTF_8);
@@ -377,12 +338,12 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     /**
-     * Reads a value of {@link #lastAppliedIndex()} from the storage, avoiding 
memtable, and sets it as a new value of
+     * Reads a value of {@link #lastAppliedIndex()} from the storage, avoiding 
mem-table, and sets it as a new value of
      * {@link #persistedIndex()}.
      *
      * @throws StorageException If failed to read index from the storage.
      */
-    public void refreshPersistedIndex() throws StorageException {
+    void refreshPersistedIndex() throws StorageException {
         if (!busyLock.enterBusy()) {
             // Don't throw the exception, there's no point in that.
             return;
@@ -433,7 +394,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      * @param readOptions Read options to be used for reading.
      * @return Group configuration.
      */
-    private RaftGroupConfiguration readLastGroupConfig(ReadOptions 
readOptions) {
+    private @Nullable RaftGroupConfiguration readLastGroupConfig(ReadOptions 
readOptions) {
         byte[] bytes;
 
         try {
@@ -461,7 +422,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
                 byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE);
 
-                byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, keyBytes);
+                byte[] previousValue = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, keyBytes);
 
                 // Previous value must belong to the same transaction.
                 if (previousValue != null) {
@@ -474,7 +435,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     // Write empty value as a tombstone.
                     if (previousValue != null) {
                         // Reuse old array with transaction id already written 
to it.
-                        writeBatch.put(cf, keyBytes, copyOf(previousValue, 
VALUE_HEADER_SIZE));
+                        writeBatch.put(helper.partCf, keyBytes, 
copyOf(previousValue, VALUE_HEADER_SIZE));
                     } else {
                         byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE];
 
@@ -482,13 +443,13 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                         putUuidToBytes(commitTableId, valueHeaderBytes, 
TABLE_ID_OFFSET);
                         putShort(valueHeaderBytes, PARTITION_ID_OFFSET, 
(short) commitPartitionId);
 
-                        writeBatch.put(cf, keyBytes, valueHeaderBytes);
+                        writeBatch.put(helper.partCf, keyBytes, 
valueHeaderBytes);
                     }
                 } else {
                     writeUnversioned(keyBufArray, row, txId, commitTableId, 
commitPartitionId);
                 }
             } catch (RocksDBException e) {
-                throw new StorageException("Failed to update a row in 
storage", e);
+                throw new StorageException("Failed to update a row in storage: 
" + createStorageInfo(), e);
             }
 
             return res;
@@ -509,7 +470,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
         byte[] rowBytes = rowBytes(row);
 
-        ByteBuffer value = ByteBuffer.allocate(rowBytes.length + 
VALUE_HEADER_SIZE);
+        ByteBuffer value = allocate(rowBytes.length + VALUE_HEADER_SIZE);
         byte[] array = value.array();
 
         putUuidToBytes(txId, array, TX_ID_OFFSET);
@@ -519,7 +480,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         value.position(VALUE_OFFSET).put(rowBytes);
 
         // Write table row data as a value.
-        writeBatch.put(cf, copyOf(keyArray, ROW_PREFIX_SIZE), value.array());
+        writeBatch.put(helper.partCf, copyOf(keyArray, ROW_PREFIX_SIZE), 
value.array());
     }
 
     private static byte[] rowBytes(TableRow row) {
@@ -532,14 +493,14 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-            WriteBatchWithIndex writeBatch = requireWriteBatch();
+            @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
requireWriteBatch();
 
             ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
             try {
                 byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
 
-                byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, keyBytes);
+                byte[] previousValue = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, keyBytes);
 
                 if (previousValue == null) {
                     //the chain doesn't contain an uncommitted write intent
@@ -547,7 +508,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 }
 
                 // Perform unconditional remove for the key without associated 
timestamp.
-                writeBatch.delete(cf, keyBytes);
+                writeBatch.delete(helper.partCf, keyBytes);
 
                 return wrapValueIntoTableRow(previousValue, true);
             } catch (RocksDBException e) {
@@ -567,20 +528,33 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 // Read a value associated with pending write.
                 byte[] uncommittedKeyBytes = copyOf(keyBuf.array(), 
ROW_PREFIX_SIZE);
 
-                byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, uncommittedKeyBytes);
+                byte[] valueBytes = writeBatch.getFromBatchAndDB(db, 
helper.partCf, readOpts, uncommittedKeyBytes);
 
                 if (valueBytes == null) {
-                    //the chain doesn't contain an uncommitted write intent
+                    // The chain doesn't contain an uncommitted write intent.
                     return null;
                 }
 
-                // Delete pending write.
-                writeBatch.delete(cf, uncommittedKeyBytes);
+                boolean isNewValueTombstone = valueBytes.length == 
VALUE_HEADER_SIZE;
 
-                // Add timestamp to the key, and put the value back into the 
storage.
-                putTimestamp(keyBuf, timestamp);
+                // Both this and previous values for the row id are tombstones.
+                boolean newAndPrevTombstones = gc.tryAddToGcQueue(writeBatch, 
rowId, timestamp, isNewValueTombstone);
 
-                writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), 
copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
+                // Delete pending write.
+                writeBatch.delete(helper.partCf, uncommittedKeyBytes);
+
+                // We only write tombstone if the previous value for the same 
row id was not a tombstone.
+                // So there won't be consecutive tombstones for the same row 
id.
+                if (!newAndPrevTombstones) {
+                    // Add timestamp to the key, and put the value back into 
the storage.
+                    putTimestampDesc(keyBuf, timestamp);
+
+                    writeBatch.put(
+                            helper.partCf,
+                            copyOf(keyBuf.array(), MAX_KEY_SIZE),
+                            copyOfRange(valueBytes, VALUE_HEADER_SIZE, 
valueBytes.length)
+                    );
+                }
 
                 return null;
             } catch (RocksDBException e) {
@@ -595,18 +569,31 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
requireWriteBatch();
 
             ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-            putTimestamp(keyBuf, commitTimestamp);
+            putTimestampDesc(keyBuf, commitTimestamp);
+
+            boolean isNewValueTombstone = row == null;
 
             //TODO IGNITE-16913 Add proper way to write row bytes into array 
without allocations.
-            byte[] rowBytes = rowBytes(row);
+            byte[] rowBytes = row != null ? rowBytes(row) : 
ArrayUtils.BYTE_EMPTY_ARRAY;
 
+            boolean newAndPrevTombstones; // Both this and previous values for 
the row id are tombstones.
             try {
-                writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), 
rowBytes);
-
-                return null;
+                newAndPrevTombstones = gc.tryAddToGcQueue(writeBatch, rowId, 
commitTimestamp, isNewValueTombstone);
             } catch (RocksDBException e) {
-                throw new StorageException("Failed to update a row in 
storage", e);
+                throw new StorageException("Failed to add row to the GC queue: 
" + createStorageInfo(), e);
+            }
+
+            // We only write tombstone if the previous value for the same row 
id was not a tombstone.
+            // So there won't be consecutive tombstones for the same row id.
+            if (!newAndPrevTombstones) {
+                try {
+                    writeBatch.put(helper.partCf, copyOf(keyBuf.array(), 
MAX_KEY_SIZE), rowBytes);
+                } catch (RocksDBException e) {
+                    throw new StorageException("Failed to update a row in 
storage: " + createStorageInfo(), e);
+                }
             }
+
+            return null;
         });
     }
 
@@ -625,11 +612,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
             try (
                     // Set next partition as an upper bound.
-                    RocksIterator baseIterator = db.newIterator(cf, 
upperBoundReadOpts);
+                    RocksIterator baseIterator = db.newIterator(helper.partCf, 
helper.upperBoundReadOpts);
                     // "count()" check is mandatory. Write batch iterator 
without any updates just crashes everything.
                     // It's not documented, but this is exactly how it should 
be used.
                     RocksIterator seekIterator = writeBatch != null && 
writeBatch.count() > 0
-                            ? writeBatch.newIteratorWithBase(cf, baseIterator)
+                            ? writeBatch.newIteratorWithBase(helper.partCf, 
baseIterator)
                             : baseIterator
             ) {
                 if (lookingForLatestVersions(timestamp)) {
@@ -659,7 +646,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             return ReadResult.empty(rowId);
         }
 
-        ByteBuffer readKeyBuf = 
MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+        ByteBuffer readKeyBuf = 
MV_KEY_BUFFER.get().rewind().limit(MAX_KEY_SIZE);
 
         int keyLength = seekIterator.key(readKeyBuf);
 
@@ -682,7 +669,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
         if (!isWriteIntent) {
             // There is no write-intent, return latest committed row.
-            return wrapCommittedValue(rowId, valueBytes, 
readTimestamp(keyBuf));
+            return wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(keyBuf));
         }
 
         return wrapUncommittedValue(rowId, valueBytes, null);
@@ -700,7 +687,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
         // Put timestamp restriction according to N2O timestamps order.
-        putTimestamp(keyBuf, timestamp);
+        putTimestampDesc(keyBuf, timestamp);
 
         // This seek will either find a key with timestamp that's less or 
equal than required value, or a different key whatsoever.
         // It is guaranteed by descending order of timestamps.
@@ -724,7 +711,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         // There's no guarantee that required key even exists. If it doesn't, 
then "seek" will point to a different key.
         // To avoid returning its value, we have to check that actual key 
matches what we need.
         // Here we prepare direct buffer to read key without timestamp. Shared 
direct buffer is used to avoid extra memory allocations.
-        ByteBuffer foundKeyBuf = 
MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+        ByteBuffer foundKeyBuf = 
MV_KEY_BUFFER.get().rewind().limit(MAX_KEY_SIZE);
 
         int keyLength = 0;
 
@@ -743,7 +730,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 return ReadResult.empty(rowId);
             }
 
-            foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+            foundKeyBuf.rewind().limit(MAX_KEY_SIZE);
             keyLength = seekIterator.key(foundKeyBuf);
 
             if (!matches(rowId, foundKeyBuf)) {
@@ -764,7 +751,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     return wrapUncommittedValue(rowId, valueBytes, null);
                 }
 
-                foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+                foundKeyBuf.rewind().limit(MAX_KEY_SIZE);
                 seekIterator.key(foundKeyBuf);
 
                 if (!matches(rowId, foundKeyBuf)) {
@@ -779,7 +766,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             // Should not be write-intent, as we were seeking with the 
timestamp.
             assert keyLength == MAX_KEY_SIZE;
 
-            HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf);
+            HybridTimestamp rowTimestamp = readTimestampDesc(foundKeyBuf);
 
             byte[] valueBytes = seekIterator.value();
 
@@ -797,7 +784,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 return wrapCommittedValue(rowId, valueBytes, rowTimestamp);
             }
 
-            foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+            foundKeyBuf.rewind().limit(MAX_KEY_SIZE);
             keyLength = seekIterator.key(foundKeyBuf);
 
             if (!matches(rowId, foundKeyBuf)) {
@@ -811,7 +798,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 return wrapUncommittedValue(rowId, seekIterator.value(), 
rowTimestamp);
             }
 
-            return wrapCommittedValue(rowId, valueBytes, 
readTimestamp(foundKeyBuf));
+            return wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(foundKeyBuf));
         }
     }
 
@@ -844,12 +831,12 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
             var options = new 
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
 
-            RocksIterator it = db.newIterator(cf, options);
+            RocksIterator it = db.newIterator(helper.partCf, options);
 
             WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
 
             if (writeBatch != null && writeBatch.count() > 0) {
-                it = writeBatch.newIteratorWithBase(cf, it);
+                it = writeBatch.newIteratorWithBase(helper.partCf, it);
             }
 
             it.seek(lowerBound);
@@ -908,15 +895,15 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         });
     }
 
-    private void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, @Nullable 
HybridTimestamp timestamp) {
+    private static void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, @Nullable 
HybridTimestamp timestamp) {
         keyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
         keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, 
normalize(rowId.leastSignificantBits()));
 
         if (timestamp != null) {
-            putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
+            putTimestampDesc(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
         }
 
-        keyBuf.position(0);
+        keyBuf.rewind();
     }
 
     @Override
@@ -924,9 +911,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-            ByteBuffer keyBuf = 
prepareHeapKeyBuf(lowerBound).position(0).limit(ROW_PREFIX_SIZE);
+            ByteBuffer keyBuf = 
prepareHeapKeyBuf(lowerBound).rewind().limit(ROW_PREFIX_SIZE);
 
-            try (RocksIterator it = db.newIterator(cf, scanReadOptions)) {
+            try (RocksIterator it = db.newIterator(helper.partCf, 
helper.scanReadOpts)) {
                 it.seek(keyBuf);
 
                 if (!it.isValid()) {
@@ -935,7 +922,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     return null;
                 }
 
-                ByteBuffer readKeyBuf = 
MV_KEY_BUFFER.get().position(0).limit(ROW_PREFIX_SIZE);
+                ByteBuffer readKeyBuf = 
MV_KEY_BUFFER.get().rewind().limit(ROW_PREFIX_SIZE);
 
                 it.key(readKeyBuf);
 
@@ -969,13 +956,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
         buf.putShort(0, partitionId);
 
-        buf.position(0);
+        buf.rewind();
     }
 
     private RowId getRowId(ByteBuffer keyBuffer) {
-        keyBuffer.position(ROW_ID_OFFSET);
-
-        return new RowId(partitionId, normalize(keyBuffer.getLong()), 
normalize(keyBuffer.getLong()));
+        return helper.getRowId(keyBuffer, ROW_ID_OFFSET);
     }
 
     @Override
@@ -984,11 +969,11 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
             try (
-                    var upperBound = new Slice(partitionEndPrefix());
+                    var upperBound = new Slice(helper.partitionEndPrefix());
                     var options = new 
ReadOptions().setIterateUpperBound(upperBound);
-                    RocksIterator it = db.newIterator(cf, options)
+                    RocksIterator it = db.newIterator(helper.partCf, options)
             ) {
-                it.seek(partitionStartPrefix());
+                it.seek(helper.partitionStartPrefix());
 
                 long size = 0;
 
@@ -1005,14 +990,29 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /**
      * Deletes partition data from the storage, using write batch to perform 
the operation.
      */
-    public void destroyData(WriteBatch writeBatch) throws RocksDBException {
+    void destroyData(WriteBatch writeBatch) throws RocksDBException {
         writeBatch.delete(meta, lastAppliedIndexKey);
         writeBatch.delete(meta, lastAppliedTermKey);
         writeBatch.delete(meta, lastGroupConfigKey);
 
         writeBatch.delete(meta, partitionIdKey(partitionId));
 
-        writeBatch.deleteRange(cf, partitionStartPrefix(), 
partitionEndPrefix());
+        writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(), 
helper.partitionEndPrefix());
+
+        gc.deleteQueue(writeBatch);
+    }
+
+    @Override
+    public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            try {
+                return gc.pollForVacuum(requireWriteBatch(), lowWatermark);
+            } catch (RocksDBException e) {
+                throw new StorageException("Failed to collect garbage: " + 
createStorageInfo(), e);
+            }
+        });
     }
 
     @Override
@@ -1027,7 +1027,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
         busyLock.block();
 
-        RocksUtils.closeAll(persistedTierReadOpts, readOpts, writeOpts, 
scanReadOptions, upperBoundReadOpts, upperBound);
+        RocksUtils.closeAll(persistedTierReadOpts, readOpts, writeOpts);
+
+        helper.close();
     }
 
     private WriteBatchWithIndex requireWriteBatch() {
@@ -1046,44 +1048,13 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     private ByteBuffer prepareHeapKeyBuf(RowId rowId) {
         assert rowId.partitionId() == partitionId : rowId;
 
-        ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().position(0);
+        ByteBuffer keyBuf = HEAP_KEY_BUFFER.get().rewind();
 
         keyBuf.putShort((short) rowId.partitionId());
-        keyBuf.putLong(normalize(rowId.mostSignificantBits()));
-        keyBuf.putLong(normalize(rowId.leastSignificantBits()));
 
-        return keyBuf;
-    }
+        helper.putRowId(keyBuf, rowId);
 
-    /**
-     * Converts signed long into a new long value, that when written in Big 
Endian, will preserve the comparison order if compared
-     * lexicographically as an array of unsigned bytes. For example, values 
{@code -1} and {@code 0}, when written in BE, will become
-     * {@code 0xFF..F} and {@code 0x00..0}, and lose their ascending order.
-     *
-     * <p/>Flipping the sign bit will change the situation: {@code -1 -> 
0x7F..F} and {@code 0 -> 0x80..0}.
-     */
-    private static long normalize(long value) {
-        return value ^ (1L << 63);
-    }
-
-    /**
-     * Writes a timestamp into a byte buffer, in descending lexicographical 
bytes order.
-     */
-    private static void putTimestamp(ByteBuffer buf, HybridTimestamp ts) {
-        assert buf.order() == KEY_BYTE_ORDER;
-
-        // "bitwise negation" turns ascending order into a descending one.
-        buf.putLong(~ts.getPhysical());
-        buf.putInt(~ts.getLogical());
-    }
-
-    private static HybridTimestamp readTimestamp(ByteBuffer keyBuf) {
-        assert keyBuf.order() == KEY_BYTE_ORDER;
-
-        long physical = ~keyBuf.getLong(ROW_PREFIX_SIZE);
-        int logical = ~keyBuf.getInt(ROW_PREFIX_SIZE + Long.BYTES);
-
-        return new HybridTimestamp(physical, logical);
+        return keyBuf;
     }
 
     private static void putShort(byte[] array, int off, short value) {
@@ -1102,7 +1073,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /**
      * Checks iterator validity, including both finished iteration and 
occurred exception.
      */
-    private static boolean invalid(RocksIterator it) {
+    static boolean invalid(RocksIterator it) {
         boolean invalid = !it.isValid();
 
         if (invalid) {
@@ -1180,18 +1151,14 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      * Creates a prefix of all keys in the given partition.
      */
     public byte[] partitionStartPrefix() {
-        return unsignedShortAsBytes(partitionId);
+        return helper.partitionStartPrefix();
     }
 
     /**
      * Creates a prefix of all keys in the next partition, used as an 
exclusive bound.
      */
     public byte[] partitionEndPrefix() {
-        return unsignedShortAsBytes(partitionId + 1);
-    }
-
-    private static byte[] unsignedShortAsBytes(int value) {
-        return new byte[] {(byte) (value >>> 8), (byte) value};
+        return helper.partitionEndPrefix();
     }
 
     /**
@@ -1202,7 +1169,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     private abstract class BasePartitionTimestampCursor implements 
PartitionTimestampCursor {
-        protected final RocksIterator it = db.newIterator(cf, scanReadOptions);
+        protected final RocksIterator it = db.newIterator(helper.partCf, 
helper.scanReadOpts);
 
         // Here's seek buffer itself. Originally it contains a valid partition 
id, row id payload that's filled with zeroes, and maybe
         // a timestamp value. Zero row id guarantees that it's 
lexicographically less than or equal to any other row id stored in the
@@ -1211,9 +1178,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         //  - no one guarantees that there will only be a single cursor;
         //  - no one guarantees that returned cursor will not be used by other 
threads.
         // The thing is, we need this buffer to preserve its content between 
invocations of "hasNext" method.
-        protected final ByteBuffer seekKeyBuf = 
ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short) 
partitionId);
+        final ByteBuffer seekKeyBuf = 
allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
 
-        protected RowId currentRowId;
+        RowId currentRowId;
 
         /** Cached value for {@link #next()} method. Also optimizes the code 
of {@link #hasNext()}. */
         protected ReadResult next;
@@ -1254,6 +1221,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             });
         }
 
+        @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException") // 
It can.
         @Override
         public final ReadResult next() {
             return busy(() -> {
@@ -1293,10 +1261,10 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             currentRowId = null;
 
             // Prepare direct buffer slice to read keys from the iterator.
-            ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().position(0);
+            ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().rewind();
 
             while (true) {
-                currentKeyBuffer.position(0);
+                currentKeyBuffer.rewind();
 
                 // At this point, seekKeyBuf should contain row id that's 
above the one we already scanned, but not greater than any
                 // other row id in partition. When we start, row id is filled 
with zeroes. Value during the iteration is described later
@@ -1361,7 +1329,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
                         if (matches(rowId, key)) {
                             // This is a next version of current row.
-                            nextCommitTimestamp = readTimestamp(key);
+                            nextCommitTimestamp = readTimestampDesc(key);
                         }
                     }
                 }
@@ -1374,7 +1342,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
                 if (!isWriteIntent) {
                     // There is no write-intent, return latest committed row.
-                    readResult = wrapCommittedValue(rowId, valueBytes, 
readTimestamp(currentKeyBuffer));
+                    readResult = wrapCommittedValue(rowId, valueBytes, 
readTimestampDesc(currentKeyBuffer));
                 } else {
                     readResult = wrapUncommittedValue(rowId, valueBytes, 
nextCommitTimestamp);
                 }
@@ -1392,7 +1360,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     private final class ScanByTimestampCursor extends 
BasePartitionTimestampCursor {
         private final HybridTimestamp timestamp;
 
-        public ScanByTimestampCursor(HybridTimestamp timestamp) {
+        private ScanByTimestampCursor(HybridTimestamp timestamp) {
             this.timestamp = timestamp;
         }
 
@@ -1411,7 +1379,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
             currentRowId = null;
 
             // Prepare direct buffer slice to read keys from the iterator.
-            ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+            ByteBuffer directBuffer = MV_KEY_BUFFER.get().rewind();
 
             while (true) {
                 //TODO IGNITE-18201 Remove copying.
@@ -1422,7 +1390,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                 }
 
                 // We need to figure out what current row id is.
-                it.key(directBuffer.position(0));
+                it.key(directBuffer.rewind());
 
                 RowId rowId = getRowId(directBuffer);
 
@@ -1494,7 +1462,7 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
      *
      * @throws StorageRebalanceException If there was an error when aborting 
the rebalance.
      */
-    void abortReblance(WriteBatch writeBatch) {
+    void abortRebalance(WriteBatch writeBatch) {
         if (!state.compareAndSet(StorageState.REBALANCE, 
StorageState.RUNNABLE)) {
             throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
         }
@@ -1533,7 +1501,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
         writeBatch.delete(meta, lastGroupConfigKey);
         writeBatch.delete(meta, partitionIdKey(partitionId));
-        writeBatch.deleteRange(cf, partitionStartPrefix(), 
partitionEndPrefix());
+        writeBatch.deleteRange(helper.partCf, helper.partitionStartPrefix(), 
helper.partitionEndPrefix());
+
+        gc.deleteQueue(writeBatch);
     }
 
     private void saveLastApplied(WriteBatch writeBatch, long lastAppliedIndex, 
long lastAppliedTerm) throws RocksDBException {
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 39987f5153..4767d1314d 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.GC_QUEUE_CF_NAME;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.HASH_INDEX_CF_NAME;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_CF_NAME;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
@@ -114,6 +115,9 @@ public class RocksDbTableStorage implements MvTableStorage {
     /** Column Family handle for partition data. */
     private volatile ColumnFamily partitionCf;
 
+    /** Column Family handle for GC queue. */
+    private volatile ColumnFamily gcQueueCf;
+
     /** Column Family handle for Hash Index data. */
     private volatile ColumnFamily hashIndexCf;
 
@@ -186,6 +190,13 @@ public class RocksDbTableStorage implements MvTableStorage 
{
         return meta.columnFamily().handle();
     }
 
+    /**
+     * Returns a column family handle for GC queue.
+     */
+    public ColumnFamilyHandle gcQueueHandle() {
+        return gcQueueCf.handle();
+    }
+
     @Override
     public TableConfiguration configuration() {
         return tableCfg;
@@ -243,6 +254,11 @@ public class RocksDbTableStorage implements MvTableStorage 
{
 
                             break;
 
+                        case GC_QUEUE:
+                            gcQueueCf = cf;
+
+                            break;
+
                         case HASH_INDEX:
                             hashIndexCf = cf;
 
@@ -333,6 +349,7 @@ public class RocksDbTableStorage implements MvTableStorage {
 
         resources.add(meta.columnFamily().handle());
         resources.add(partitionCf.handle());
+        resources.add(gcQueueCf.handle());
         resources.add(hashIndexCf.handle());
         resources.addAll(
                 sortedIndices.values().stream()
@@ -579,7 +596,7 @@ public class RocksDbTableStorage implements MvTableStorage {
     /**
      * Returns a list of Column Families' names that belong to a RocksDB 
instance in the given path.
      *
-     * @return Map with column families names.
+     * @return List with column families names.
      * @throws StorageException If something went wrong.
      */
     private List<String> getExistingCfNames() {
@@ -593,7 +610,7 @@ public class RocksDbTableStorage implements MvTableStorage {
 
             // even if the database is new (no existing Column Families), we 
return the names of mandatory column families, that
             // will be created automatically.
-            return existingNames.isEmpty() ? List.of(META_CF_NAME, 
PARTITION_CF_NAME, HASH_INDEX_CF_NAME) : existingNames;
+            return existingNames.isEmpty() ? List.of(META_CF_NAME, 
PARTITION_CF_NAME, GC_QUEUE_CF_NAME, HASH_INDEX_CF_NAME) : existingNames;
         } catch (RocksDBException e) {
             throw new StorageException(
                     "Failed to read list of column families names for the 
RocksDB instance located at path " + absolutePathStr, e
@@ -618,12 +635,18 @@ public class RocksDbTableStorage implements 
MvTableStorage {
     private ColumnFamilyDescriptor cfDescriptorFromName(String cfName) {
         switch (ColumnFamilyType.fromCfName(cfName)) {
             case META:
-            case PARTITION:
+            case GC_QUEUE:
                 return new ColumnFamilyDescriptor(
                         cfName.getBytes(UTF_8),
                         new ColumnFamilyOptions()
                 );
 
+            case PARTITION:
+                return new ColumnFamilyDescriptor(
+                        cfName.getBytes(UTF_8),
+                        new 
ColumnFamilyOptions().useFixedLengthPrefixExtractor(PartitionDataHelper.ROW_PREFIX_SIZE)
+                );
+
             case HASH_INDEX:
                 return new ColumnFamilyDescriptor(
                         cfName.getBytes(UTF_8),
@@ -703,7 +726,7 @@ public class RocksDbTableStorage implements MvTableStorage {
             }
 
             try (WriteBatch writeBatch = new WriteBatch()) {
-                mvPartitionStorage.abortReblance(writeBatch);
+                mvPartitionStorage.abortRebalance(writeBatch);
 
                 getHashIndexStorages(partitionId).forEach(index -> 
index.abortReblance(writeBatch));
                 getSortedIndexStorages(partitionId).forEach(index -> 
index.abortReblance(writeBatch));
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
new file mode 100644
index 0000000000..0c0cb220a7
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.file.Path;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test implementation for {@link RocksDbStorageEngine}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbMvPartitionStorageGcTest extends 
AbstractMvPartitionStorageGcTest {
+    @InjectConfiguration
+    private RocksDbStorageEngineConfiguration engineConfig;
+
+    @WorkDirectory
+    private Path workDir;
+
+    @Override
+    protected StorageEngine createEngine() {
+        return new RocksDbStorageEngine(engineConfig, workDir);
+    }
+}
diff --git a/modules/storage-rocksdb/tech-notes/garbage-collection.md 
b/modules/storage-rocksdb/tech-notes/garbage-collection.md
new file mode 100644
index 0000000000..bed34edf42
--- /dev/null
+++ b/modules/storage-rocksdb/tech-notes/garbage-collection.md
@@ -0,0 +1,95 @@
+# Garbage Collection in the RocksDB partition storage
+
+This document describes the process of Garbage Collection in the RocksDB-based 
storage.
+The goal of this process is to remove stale data based on a given timestamp.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage 
collection. Older versions of rows can
+be garbage collected if and only if there's a newer version of the row and 
this new version's timestamp
+is below the low watermark. This low watermark indicates the minimal timestamp 
that a running transaction might have.
+
+Consider the following example:  
+*Note that **Record number** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1             | Foo    | 1         |
+| 2             | Foo    | 10        |
+
+In this case, we can only remove record 1 if the low watermark is 10 or 
higher. If watermark is at 9,
+then it means that there can still occur a transaction with a 9 timestamp, 
which means that the record number 1
+is still needed.  
+This is why we only add a new entry into the GC queue if there is a previous 
version and that is
+why the timestamp of the entry in the GC queue is of the next version.
+
+Let's review another example:  
+*Note that **Is tombstone** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp | Is tombstone |
+|---------------|--------|-----------|--------------|
+| 1             | Foo    | 1         | False        |
+| 2             | Foo    | 10        | True         |
+| 3             | Foo    | 20        | False        |
+
+Everything said before stands for this example, however we can also remove the 
record number 2, because it is
+a tombstone. So if the watermark is higher or equal to 10 and there is a 
transaction with timestamp higher than
+10, then we either get an empty value if timestamp is less than 20, or we get 
a newer version.
+
+So to sum up, the algorithm looks like this:
+
+1. Get an element from the GC queue, exiting if the queue is empty
+2. Add that element to the batch for removal from RocksDB, if the element's 
timestamp is below the watermark, exiting otherwise
+3. Find an element in the data column family that corresponds to the element 
of GC queue. If a value doesn't exist, exit
+4. Test if it is a tombstone, if yes, add it to the batch for removal
+5. Seek for a previous version. If it doesn't exist, exit
+6. Add that previous version to the batch for removal
+
+You might notice that there are two cases when we can exit prematurely, apart 
from queue being empty.  
+We might have not found a value that triggered the addition to the GC queue 
and/or the value that needs to be
+garbage collected because GC can run in parallel. So if two parallel threads 
got the same element from the
+queue, one of them might have already finished the GC and removed the elements.
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new 
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only 
put new value's timestamp
+please refer to the Garbage Collection 
[algorithm](#garbage-collection-algorithm).  
+The queue is updated along with the data column family in a single batch and 
is destroyed when the storage
+is being cleared or destroyed.
+
+## Storage implications
+
+To save space we don't store consecutive tombstones.
+For example, if a user removes a certain row twice
+
+```
+storage.put(key, value); // Time 1
+storage.delete(key); // Time 10
+storage.delete(key); // Time 20
+```
+
+There should be one row with a value and one row with a tombstone, the 
tombstone being
+the oldest one. This also simplifies the processing of the garbage collection 
queue.
+So in the storage we will see something like:
+
+| Key | Value       | Timestamp |
+|-----|-------------|-----------|
+| Foo | Bar         | 1         |
+| Foo | <tombstone> | 10        |


Reply via email to